package com.netflix.atlas.stream;

import akka.NotUsed$;
import akka.actor.ActorSystem;
import akka.stream.AbruptTerminationException;
import akka.stream.Materializer$;
import akka.stream.ThrottleMode$Shaping$;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source$;
import com.netflix.atlas.akka.StreamOps;
import com.netflix.atlas.akka.StreamOps$;
import com.netflix.atlas.eval.stream.Evaluator;
import com.netflix.iep.service.AbstractService;
import com.netflix.spectator.api.DistributionSummary;
import com.netflix.spectator.api.Registry;
import com.typesafe.config.Config;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.inject.Inject;
import org.reactivestreams.Publisher;
import scala.MatchError;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.mutable.Set;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Statics;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: EvalService.scala */
@ScalaSignature(bytes = "\u0006\u0005\t-f\u0001B\u001a5\u0001uB\u0001B\u0014\u0001\u0003\u0006\u0004%\ta\u0014\u0005\t+\u0002\u0011\t\u0011)A\u0005!\"Aa\u000b\u0001BC\u0002\u0013\u0005q\u000b\u0003\u0005a\u0001\t\u0005\t\u0015!\u0003Y\u0011!\t\u0007A!b\u0001\n\u0003\u0011\u0007\u0002\u00036\u0001\u0005\u0003\u0005\u000b\u0011B2\t\u0011-\u0004!Q1A\u0005\u00041D\u0001\"\u001e\u0001\u0003\u0002\u0003\u0006I!\u001c\u0005\u0006m\u0002!\ta\u001e\u0005\n\u0003\u001f\u0001!\u0019!C\u0006\u0003#A\u0001\"a\t\u0001A\u0003%\u00111\u0003\u0005\n\u0003K\u0001!\u0019!C\u0005\u0003OA\u0001B!\r\u0001A\u0003%\u0011\u0011\u0006\u0005\n\u0005g\u0001!\u0019!C\u0005\u0005kA\u0001Ba\u0011\u0001A\u0003%!q\u0007\u0005\n\u0005\u000b\u0002!\u0019!C\u0005\u0005\u000fB\u0001Ba\u0014\u0001A\u0003%!\u0011\n\u0005\n\u0005#\u0002!\u0019!C\u0005\u0005'B\u0001Ba\u0017\u0001A\u0003%!Q\u000b\u0005\b\u0005;\u0002A\u0011\u0001B*\u0011\u001d\u0011y\u0006\u0001C!\u0003\u0017DqA!\u0019\u0001\t\u0003\nY\rC\u0004\u0003d\u0001!I!a3\t\u000f\t\u0015\u0004\u0001\"\u0003\u0003h!9!1\u000f\u0001\u0005\u0002\tU\u0004b\u0002BJ\u0001\u0011\u0005!Q\u0013\u0005\b\u00053\u0003A\u0011\u0001BN\u0011\u001d\u0011\t\u000b\u0001C\t\u0005GCqAa*\u0001\t\u0013\u0011IkB\u0004\u0002fQB\t!a\u001a\u0007\rM\"\u0004\u0012AA5\u0011\u00191x\u0004\"\u0001\u0002t\u00191\u0011QO\u0010\u0001\u0003oB!\"!\u001f\"\u0005\u000b\u0007I\u0011AA>\u0011)\ti/\tB\u0001B\u0003%\u0011Q\u0010\u0005\u000b\u0003_\f#\u00111A\u0005\u0002\u0005E\bBCA��C\t\u0005\r\u0011\"\u0001\u0003\u0002!Q!qA\u0011\u0003\u0002\u0003\u0006K!a=\t\rY\fC\u0011\u0001B\u0005\u000f%\u0011\tbHA\u0001\u0012\u0003\u0011\u0019BB\u0005\u0002v}\t\t\u0011#\u0001\u0003\u0016!1a/\u000bC\u0001\u0005/A\u0011B!\u0007*#\u0003%\tAa\u0007\u0007\r\u0005\u0005u\u0004AAB\u0011)\t9\t\fB\u0001B\u0003%\u00111\b\u0005\u000b\u0003\u0013c#\u0011!Q\u0001\n\u0005-\u0005B\u0002<-\t\u0003\t\u0019\fC\u0004\u0002<2\"\t!!0\t\u000f\u0005%G\u0006\"\u0001\u0002L\"9\u00111\u001b\u0017\u0005B\u0005U'aC#wC2\u001cVM\u001d<jG\u0016T!!\u000e\u001c\u0002\rM$(/Z1n\u0015\t9\u0004(A\u0003bi2\f7O\u0003\u0002:u\u00059a.\u001a;gY&D(\"A\u001e\u0002\u0007\r|Wn\u0001\u0001\u0014\u0007\u0001qd\t\u0005\u0002@\t6\t\u0001I\u0003\u0002B\u0005\u000691/\u001a:wS\u000e,'BA\"9\u0003\rIW\r]\u0005\u0003\u000b\u0002\u0013q\"\u00112tiJ\f7\r^*feZL7-\u001a\t\u0003\u000f2k\u0011\u0001\u0013\u0006\u0003\u0013*\u000bAb]2bY\u0006dwnZ4j]\u001eT!a\u0013\u001e\u0002\u0011QL\b/Z:bM\u0016L!!\u0014%\u0003\u001bM#(/[2u\u0019><w-\u001b8h\u0003\u0019\u0019wN\u001c4jOV\t\u0001\u000b\u0005\u0002R'6\t!K\u0003\u0002O\u0015&\u0011AK\u0015\u0002\u0007\u0007>tg-[4\u0002\u000f\r|gNZ5hA\u0005A!/Z4jgR\u0014\u00180F\u0001Y!\tIf,D\u0001[\u0015\tYF,A\u0002ba&T!!\u0018\u001d\u0002\u0013M\u0004Xm\u0019;bi>\u0014\u0018BA0[\u0005!\u0011VmZ5tiJL\u0018!\u0003:fO&\u001cHO]=!\u0003%)g/\u00197vCR|'/F\u0001d!\t!\u0007.D\u0001f\u0015\t)dM\u0003\u0002hm\u0005!QM^1m\u0013\tIWMA\u0005Fm\u0006dW/\u0019;pe\u0006QQM^1mk\u0006$xN\u001d\u0011\u0002\rML8\u000f^3n+\u0005i\u0007C\u00018t\u001b\u0005y'B\u00019r\u0003\u0015\t7\r^8s\u0015\u0005\u0011\u0018\u0001B1lW\u0006L!\u0001^8\u0003\u0017\u0005\u001bGo\u001c:TsN$X-\\\u0001\bgf\u001cH/Z7!\u0003\u0019a\u0014N\\5u}Q)\u0001P_>}{B\u0011\u0011\u0010A\u0007\u0002i!)a*\u0003a\u0001!\")a+\u0003a\u00011\")\u0011-\u0003a\u0001G\")1.\u0003a\u0002[\"\u0012\u0011b \t\u0005\u0003\u0003\tY!\u0004\u0002\u0002\u0004)!\u0011QAA\u0004\u0003\u0019IgN[3di*\u0011\u0011\u0011B\u0001\u0006U\u00064\u0018\r_\u0005\u0005\u0003\u001b\t\u0019A\u0001\u0004J]*,7\r^\u0001\u0003K\u000e,\"!a\u0005\u0011\t\u0005U\u0011qD\u0007\u0003\u0003/QA!!\u0007\u0002\u001c\u0005Q1m\u001c8dkJ\u0014XM\u001c;\u000b\u0005\u0005u\u0011!B:dC2\f\u0017\u0002BA\u0011\u0003/\u0011\u0001$\u0012=fGV$\u0018n\u001c8D_:$X\r\u001f;Fq\u0016\u001cW\u000f^8s\u0003\r)7\rI\u0001\u000ee\u0016<\u0017n\u001d;sCRLwN\\:\u0016\u0005\u0005%\u0002\u0003CA\u0016\u0003o\tY$!\u0015\u000e\u0005\u00055\"\u0002BA\r\u0003_QA!!\r\u00024\u0005!Q\u000f^5m\u0015\t\t)$\u0001\u0003kCZ\f\u0017\u0002BA\u001d\u0003[\u0011\u0011cQ8oGV\u0014(/\u001a8u\u0011\u0006\u001c\b.T1q!\u0011\ti$a\u0013\u000f\t\u0005}\u0012q\t\t\u0005\u0003\u0003\nY\"\u0004\u0002\u0002D)\u0019\u0011Q\t\u001f\u0002\rq\u0012xn\u001c;?\u0013\u0011\tI%a\u0007\u0002\rA\u0013X\rZ3g\u0013\u0011\ti%a\u0014\u0003\rM#(/\u001b8h\u0015\u0011\tI%a\u0007\u0011\u0007\u0005M\u0013ED\u0002\u0002VyqA!a\u0016\u0002d9!\u0011\u0011LA1\u001d\u0011\tY&a\u0018\u000f\t\u0005\u0005\u0013QL\u0005\u0002w%\u0011\u0011HO\u0005\u0003oaJ!!\u000e\u001c\u0002\u0017\u00153\u0018\r\\*feZL7-\u001a\t\u0003s~\u00192aHA6!\u0011\ti'a\u001c\u000e\u0005\u0005m\u0011\u0002BA9\u00037\u0011a!\u00118z%\u00164GCAA4\u0005)\u0019FO]3b[&sgm\\\n\u0004C\u0005-\u0014a\u00025b]\u0012dWM]\u000b\u0003\u0003{\u0002R!a -\u0003/l\u0011a\b\u0002\r#V,W/\u001a%b]\u0012dWM]\u000b\u0005\u0003\u000b\u000b\tk\u0005\u0003-\u0003W2\u0015AA5e\u0003\u0015\tX/Z;f!\u0019\ti)a&\u0002\u001e:!\u0011qRAJ\u001b\t\t\tJ\u0003\u0002sm%!\u0011QSAI\u0003%\u0019FO]3b[>\u00038/\u0003\u0003\u0002\u001a\u0006m%aC*pkJ\u001cW-U;fk\u0016TA!!&\u0002\u0012B!\u0011qTAQ\u0019\u0001!q!a)-\u0005\u0004\t)KA\u0001U#\u0011\t9+!,\u0011\t\u00055\u0014\u0011V\u0005\u0005\u0003W\u000bYBA\u0004O_RD\u0017N\\4\u0011\t\u00055\u0014qV\u0005\u0005\u0003c\u000bYBA\u0002B]f$b!!.\u00028\u0006e\u0006#BA@Y\u0005u\u0005bBAD_\u0001\u0007\u00111\b\u0005\b\u0003\u0013{\u0003\u0019AAF\u0003\u0015ygMZ3s)\u0011\ty,!2\u0011\t\u00055\u0014\u0011Y\u0005\u0005\u0003\u0007\fYBA\u0004C_>dW-\u00198\t\u000f\u0005\u001d\u0007\u00071\u0001\u0002\u001e\u0006\u0019Qn]4\u0002\u0011\r|W\u000e\u001d7fi\u0016$\"!!4\u0011\t\u00055\u0014qZ\u0005\u0005\u0003#\fYB\u0001\u0003V]&$\u0018\u0001\u0003;p'R\u0014\u0018N\\4\u0015\u0005\u0005m\u0002\u0003BAm\u0003OtA!a7\u0002d:!\u0011Q\\Aq\u001d\u0011\t9&a8\n\u0005\u001d4\u0014BA\u001bg\u0013\r\t)/Z\u0001\n\u000bZ\fG.^1u_JLA!!;\u0002l\nyQ*Z:tC\u001e,WI\u001c<fY>\u0004XMC\u0002\u0002f\u0016\f\u0001\u0002[1oI2,'\u000fI\u0001\fI\u0006$\u0018mU8ve\u000e,7/\u0006\u0002\u0002tB1\u0011QNA{\u0003sLA!a>\u0002\u001c\t1q\n\u001d;j_:\u0004B!!7\u0002|&!\u0011Q`Av\u0005-!\u0015\r^1T_V\u00148-Z:\u0002\u001f\u0011\fG/Y*pkJ\u001cWm]0%KF$B!!4\u0003\u0004!I!QA\u0013\u0002\u0002\u0003\u0007\u00111_\u0001\u0004q\u0012\n\u0014\u0001\u00043bi\u0006\u001cv.\u001e:dKN\u0004CC\u0002B\u0006\u0005\u001b\u0011y\u0001E\u0002\u0002��\u0005Bq!!\u001f(\u0001\u0004\ti\bC\u0005\u0002p\u001e\u0002\n\u00111\u0001\u0002t\u0006Q1\u000b\u001e:fC6LeNZ8\u0011\u0007\u0005}\u0014fE\u0002*\u0003W\"\"Aa\u0005\u00027\u0011bWm]:j]&$He\u001a:fCR,'\u000f\n3fM\u0006,H\u000e\u001e\u00133+\t\u0011iB\u000b\u0003\u0002t\n}1F\u0001B\u0011!\u0011\u0011\u0019C!\f\u000e\u0005\t\u0015\"\u0002\u0002B\u0014\u0005S\t\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\t\t-\u00121D\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002\u0002B\u0018\u0005K\u0011\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u00039\u0011XmZ5tiJ\fG/[8og\u0002\naB\\;n\t\u0006$\u0018mU8ve\u000e,7/\u0006\u0002\u00038A!!\u0011\bB \u001b\t\u0011YD\u0003\u0003\u0003>\u00055\u0012AB1u_6L7-\u0003\u0003\u0003B\tm\"!D!u_6L7-\u00138uK\u001e,'/A\bok6$\u0015\r^1T_V\u00148-Z:!\u0003QqW/\u001c#bi\u0006\u001cv.\u001e:dK\u0012K7\u000f^*v[V\u0011!\u0011\n\t\u00043\n-\u0013b\u0001B'5\n\u0019B)[:ue&\u0014W\u000f^5p]N+X.\\1ss\u0006)b.^7ECR\f7k\\;sG\u0016$\u0015n\u001d;Tk6\u0004\u0013!C9vKV,7+\u001b>f+\t\u0011)\u0006\u0005\u0003\u0002n\t]\u0013\u0002\u0002B-\u00037\u00111!\u00138u\u0003)\tX/Z;f'&TX\rI\u0001\u0012O\u0016$h*^7ECR\f7k\\;sG\u0016\u001c\u0018!C:uCJ$\u0018*\u001c9m\u0003!\u0019Ho\u001c9J[Bd\u0017aB2mK\u0006tW\u000f]\u0001\u0012I&\u001cHO]5ckR,W*Z:tC\u001e,G\u0003\u0002B5\u0005_\u0002B!!\u001c\u0003l%!!QNA\u000e\u0005\u0019\te.\u001f,bY\"9!\u0011\u000f\rA\u0002\u0005]\u0017\u0001C3om\u0016dw\u000e]3\u0002\u0011I,w-[:uKJ$BAa\u001e\u0003\u0010BA\u0011Q\u000eB=\u0005{\u0012y(\u0003\u0003\u0003|\u0005m!A\u0002+va2,'\u0007\u0005\u0004\u0002\u000e\u0006]\u0015q\u001b\t\u0007\u0005\u0003\u0013Y)a6\u000e\u0005\t\r%\u0002\u0002BC\u0005\u000f\u000bqB]3bGRLg/Z:ue\u0016\fWn\u001d\u0006\u0003\u0005\u0013\u000b1a\u001c:h\u0013\u0011\u0011iIa!\u0003\u0013A+(\r\\5tQ\u0016\u0014\bb\u0002BI3\u0001\u0007\u00111H\u0001\tgR\u0014X-Y7JI\u0006QQO\u001c:fO&\u001cH/\u001a:\u0015\t\u00055'q\u0013\u0005\b\u0005#S\u0002\u0019AA\u001e\u0003E)\b\u000fZ1uK\u0012\u000bG/Y*pkJ\u001cWm\u001d\u000b\u0007\u0003\u001b\u0014iJa(\t\u000f\tE5\u00041\u0001\u0002<!9\u0011q^\u000eA\u0002\u0005e\u0018!D4fiN#(/Z1n\u0013:4w\u000e\u0006\u0003\u0002R\t\u0015\u0006b\u0002BI9\u0001\u0007\u00111H\u0001\u0016O\u0016$8)\u001e:sK:$H)\u0019;b'>,(oY3t+\t\tI\u0010")
/* loaded from: input_file:com/netflix/atlas/stream/EvalService.class */
public class EvalService extends AbstractService implements StrictLogging {
    private final Config config;
    private final Registry registry;
    private final Evaluator evaluator;
    private final ActorSystem system;
    private final ExecutionContextExecutor ec;
    private final ConcurrentHashMap<String, StreamInfo> registrations;
    private final AtomicInteger numDataSources;
    private final DistributionSummary numDataSourceDistSum;
    private final int queueSize;
    private Logger logger;

    /* compiled from: EvalService.scala */
    /* loaded from: input_file:com/netflix/atlas/stream/EvalService$QueueHandler.class */
    public static class QueueHandler<T> implements StrictLogging {
        private final String id;
        private final StreamOps.SourceQueue<T> queue;
        private Logger logger;

        public Logger logger() {
            return this.logger;
        }

        public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger) {
            this.logger = logger;
        }

        public boolean offer(T t) {
            return this.queue.offer(t);
        }

        public void complete() {
            if (logger().underlying().isInfoEnabled()) {
                logger().underlying().info("queue complete for: {}", this.id);
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            this.queue.complete();
        }

        public String toString() {
            return new StringBuilder(14).append("QueueHandler(").append(this.id).append(")").toString();
        }

        public QueueHandler(String str, StreamOps.SourceQueue<T> sourceQueue) {
            this.id = str;
            this.queue = sourceQueue;
            StrictLogging.$init$(this);
            Statics.releaseFence();
        }
    }

    /* compiled from: EvalService.scala */
    /* loaded from: input_file:com/netflix/atlas/stream/EvalService$StreamInfo.class */
    public static class StreamInfo {
        private final QueueHandler<Evaluator.MessageEnvelope> handler;
        private Option<Evaluator.DataSources> dataSources;

        public QueueHandler<Evaluator.MessageEnvelope> handler() {
            return this.handler;
        }

        public Option<Evaluator.DataSources> dataSources() {
            return this.dataSources;
        }

        public void dataSources_$eq(Option<Evaluator.DataSources> option) {
            this.dataSources = option;
        }

        public StreamInfo(QueueHandler<Evaluator.MessageEnvelope> queueHandler, Option<Evaluator.DataSources> option) {
            this.handler = queueHandler;
            this.dataSources = option;
        }
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    public Config config() {
        return this.config;
    }

    public Registry registry() {
        return this.registry;
    }

    public Evaluator evaluator() {
        return this.evaluator;
    }

    public ActorSystem system() {
        return this.system;
    }

    private ExecutionContextExecutor ec() {
        return this.ec;
    }

    private ConcurrentHashMap<String, StreamInfo> registrations() {
        return this.registrations;
    }

    private AtomicInteger numDataSources() {
        return this.numDataSources;
    }

    private DistributionSummary numDataSourceDistSum() {
        return this.numDataSourceDistSum;
    }

    private int queueSize() {
        return this.queueSize;
    }

    public int getNumDataSources() {
        return numDataSources().get();
    }

    public void startImpl() {
        if (logger().underlying().isDebugEnabled()) {
            logger().underlying().debug("Starting service");
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        cleanup();
        Source$.MODULE$.repeat(NotUsed$.MODULE$).throttle(1, new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds(), 1, ThrottleMode$Shaping$.MODULE$).map(notUsed$ -> {
            return this.getCurrentDataSources();
        }).mapMaterializedValue(notUsed -> {
            return NotUsed$.MODULE$;
        }).via(evaluator().createStreamsFlow()).runForeach(messageEnvelope -> {
            this.distributeMessage(messageEnvelope);
            return BoxedUnit.UNIT;
        }, Materializer$.MODULE$.matFromSystem(system())).onComplete(r4 -> {
            $anonfun$startImpl$4(this, r4);
            return BoxedUnit.UNIT;
        }, ec());
    }

    public void stopImpl() {
        cleanup();
    }

    private void cleanup() {
        registrations().values().forEach(streamInfo -> {
            streamInfo.handler().complete();
        });
        registrations().clear();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Object distributeMessage(Evaluator.MessageEnvelope messageEnvelope) {
        Boolean bool;
        try {
            int indexOf = messageEnvelope.getId().indexOf("|");
            if (indexOf > 0) {
                StreamInfo streamInfo = getStreamInfo(messageEnvelope.getId().substring(0, indexOf));
                if (streamInfo != null) {
                    bool = BoxesRunTime.boxToBoolean(streamInfo.handler().offer(new Evaluator.MessageEnvelope(messageEnvelope.getId().substring(indexOf + 1), messageEnvelope.getMessage())));
                } else if (logger().underlying().isDebugEnabled()) {
                    logger().underlying().debug("discarding message without handler: {}", messageEnvelope);
                    bool = BoxedUnit.UNIT;
                } else {
                    bool = BoxedUnit.UNIT;
                }
            } else if (logger().underlying().isDebugEnabled()) {
                logger().underlying().debug("discarding message without streamId: {}", messageEnvelope);
                bool = BoxedUnit.UNIT;
            } else {
                bool = BoxedUnit.UNIT;
            }
            return bool;
        } catch (Exception e) {
            if (!logger().underlying().isDebugEnabled()) {
                return BoxedUnit.UNIT;
            }
            logger().underlying().debug(new StringBuilder(28).append("error distributing message: ").append(messageEnvelope).toString(), e);
            return BoxedUnit.UNIT;
        }
    }

    public Tuple2<StreamOps.SourceQueue<Evaluator.MessageEnvelope>, Publisher<Evaluator.MessageEnvelope>> register(String str) {
        Tuple2 tuple2 = (Tuple2) StreamOps$.MODULE$.blockingQueue(registry(), "EvalService", queueSize()).toMat(Sink$.MODULE$.asPublisher(true), Keep$.MODULE$.both()).run(Materializer$.MODULE$.matFromSystem(system()));
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        Tuple2 tuple22 = new Tuple2((StreamOps.SourceQueue) tuple2._1(), (Publisher) tuple2._2());
        StreamOps.SourceQueue sourceQueue = (StreamOps.SourceQueue) tuple22._1();
        Publisher publisher = (Publisher) tuple22._2();
        if (registrations().putIfAbsent(str, new StreamInfo(new QueueHandler(str, sourceQueue), EvalService$StreamInfo$.MODULE$.$lessinit$greater$default$2())) != null) {
            throw new IllegalArgumentException(new StringBuilder(36).append("stream with id '").append(str).append("' already registered").toString());
        }
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info("stream registered: {}", str);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        return new Tuple2<>(sourceQueue, publisher);
    }

    public void unregister(String str) {
        try {
            StreamInfo remove = registrations().remove(str);
            if (remove != null) {
                remove.handler().complete();
            }
        } catch (Exception e) {
            if (!logger().underlying().isErrorEnabled()) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                logger().underlying().error(new StringBuilder(27).append("Error unregistering stream ").append(str).toString(), e);
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
        }
    }

    public void updateDataSources(String str, Evaluator.DataSources dataSources) {
        StreamInfo streamInfo = getStreamInfo(str);
        if (streamInfo == null) {
            throw new IllegalStateException(new StringBuilder(32).append("stream has not been registered: ").append(str).toString());
        }
        streamInfo.dataSources_$eq(new Some(new Evaluator.DataSources(CollectionConverters$.MODULE$.MutableSetHasAsJava((Set) CollectionConverters$.MODULE$.SetHasAsScala(dataSources.getSources()).asScala().map(dataSource -> {
            return new Evaluator.DataSource(new StringBuilder(1).append(str).append("|").append(dataSource.getId()).toString(), dataSource.getStep(), dataSource.getUri());
        })).asJava())));
    }

    public StreamInfo getStreamInfo(String str) {
        return registrations().get(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Evaluator.DataSources getCurrentDataSources() {
        java.util.Set asJava = CollectionConverters$.MODULE$.SetHasAsJava(((IterableOnceOps) ((IterableOps) CollectionConverters$.MODULE$.CollectionHasAsScala(registrations().values()).asScala().flatMap(streamInfo -> {
            return streamInfo.dataSources();
        })).flatMap(dataSources -> {
            return CollectionConverters$.MODULE$.SetHasAsScala(dataSources.getSources()).asScala();
        })).toSet()).asJava();
        numDataSources().set(asJava.size());
        numDataSourceDistSum().record(asJava.size());
        return new Evaluator.DataSources(asJava);
    }

    public static final /* synthetic */ void $anonfun$startImpl$4(EvalService evalService, Try r5) {
        BoxedUnit boxedUnit;
        if (r5 instanceof Success ? true : (r5 instanceof Failure) && (((Failure) r5).exception() instanceof AbruptTerminationException)) {
            if (evalService.logger().underlying().isWarnEnabled()) {
                evalService.logger().underlying().warn("Global eval stream completed");
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            return;
        }
        if (!(r5 instanceof Failure)) {
            throw new MatchError(r5);
        }
        Throwable exception = ((Failure) r5).exception();
        if (evalService.logger().underlying().isErrorEnabled()) {
            evalService.logger().underlying().error("Global eval stream failed, attempting to restart", exception);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
        evalService.startImpl();
        BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
    }

    @Inject
    public EvalService(Config config, Registry registry, Evaluator evaluator, ActorSystem actorSystem) {
        this.config = config;
        this.registry = registry;
        this.evaluator = evaluator;
        this.system = actorSystem;
        StrictLogging.$init$(this);
        this.ec = ExecutionContext$.MODULE$.global();
        this.registrations = new ConcurrentHashMap<>();
        this.numDataSources = new AtomicInteger(0);
        this.numDataSourceDistSum = registry.distributionSummary("evalService.numDataSource");
        this.queueSize = config.getInt("atlas.stream.eval-service.queue-size");
        Statics.releaseFence();
    }
}
