package com.arpnetworking.metrics.proxy.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Status;
import akka.http.javadsl.model.ws.Message;
import akka.http.javadsl.model.ws.TextMessage;
import com.arpnetworking.commons.jackson.databind.ObjectMapperFactory;
import com.arpnetworking.logback.annotations.LogValue;
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
import com.arpnetworking.metrics.proxy.models.messages.Command;
import com.arpnetworking.metrics.proxy.models.messages.Connect;
import com.arpnetworking.metrics.proxy.models.messages.MetricReport;
import com.arpnetworking.metrics.proxy.models.protocol.MessageProcessorsFactory;
import com.arpnetworking.metrics.proxy.models.protocol.MessagesProcessor;
import com.arpnetworking.steno.LogBuilder;
import com.arpnetworking.steno.LogValueMapFactory;
import com.arpnetworking.steno.Logger;
import com.arpnetworking.steno.LoggerFactory;
import com.arpnetworking.steno.aspect.LogBuilderAspect;
import com.arpnetworking.tsdcore.model.AggregatedData;
import com.arpnetworking.tsdcore.model.Key;
import com.arpnetworking.tsdcore.model.PeriodicData;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.aspectj.lang.JoinPoint;
import org.aspectj.runtime.reflect.Factory;

/* loaded from: input_file:com/arpnetworking/metrics/proxy/actors/Connection.class */
public class Connection extends AbstractActor {
    private ActorRef _telemetry;
    private ActorRef _channel;
    private final PeriodicMetrics _metrics;
    private final List<MessagesProcessor> _messageProcessors;
    private final Map<String, Map<String, Set<String>>> _subscriptions = Maps.newHashMap();
    private static final String METRICS_PREFIX = "actors/connection/";
    private static final String UNKNOWN_COMMAND_COUNTER = "actors/connection/command/UNKNOWN";
    private static final String UNKNOWN_COUNTER = "actors/connection/UNKNOWN";
    private static final ObjectMapper OBJECT_MAPPER;
    private static final Logger LOGGER;
    private static final JoinPoint.StaticPart ajc$tjp_0 = null;
    private static final JoinPoint.StaticPart ajc$tjp_1 = null;
    private static final JoinPoint.StaticPart ajc$tjp_2 = null;
    private static final JoinPoint.StaticPart ajc$tjp_3 = null;
    private static final JoinPoint.StaticPart ajc$tjp_4 = null;
    private static final JoinPoint.StaticPart ajc$tjp_5 = null;
    private static final JoinPoint.StaticPart ajc$tjp_6 = null;
    private static final JoinPoint.StaticPart ajc$tjp_7 = null;
    private static final JoinPoint.StaticPart ajc$tjp_8 = null;

    static {
        ajc$preClinit();
        OBJECT_MAPPER = ObjectMapperFactory.getInstance();
        LOGGER = LoggerFactory.getLogger(Connection.class);
    }

    public Connection(PeriodicMetrics periodicMetrics, MessageProcessorsFactory messageProcessorsFactory) {
        this._metrics = periodicMetrics;
        this._messageProcessors = messageProcessorsFactory.create(this, periodicMetrics);
    }

    public static Props props(PeriodicMetrics periodicMetrics, MessageProcessorsFactory messageProcessorsFactory) {
        return Props.create(Connection.class, new Object[]{periodicMetrics, messageProcessorsFactory});
    }

    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(Connect.class, connect -> {
            LogBuilder addData = LOGGER.info().setMessage("Connected stream").addData("actor", self()).addData("data", connect);
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_4, this, addData));
            addData.log();
            this._telemetry = connect.getTelemetry();
            this._channel = connect.getChannel();
        }).match(Status.Failure.class, failure -> {
            LogBuilder addData = LOGGER.info().setMessage("Closing stream").addData("actor", self()).addData("data", failure);
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_5, this, addData));
            addData.log();
            getSelf().tell(PoisonPill.getInstance(), getSelf());
        }).match(PeriodicData.class, periodicData -> {
            processPeriodicData(periodicData);
        }).matchAny(obj -> {
            if (this._channel == null) {
                LogBuilder addData = LOGGER.warn().setMessage("Unable to process message").addData("reason", "channel actor not materialized").addData("actor", self()).addData("data", obj);
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_6, this, addData));
                addData.log();
                return;
            }
            boolean z = false;
            Object command = obj instanceof Message ? new Command(OBJECT_MAPPER.readTree(((Message) obj).asTextMessage().getStrictText())) : obj;
            Iterator<MessagesProcessor> it = this._messageProcessors.iterator();
            while (it.hasNext()) {
                z = it.next().handleMessage(command);
                if (z) {
                    break;
                }
            }
            if (z) {
                return;
            }
            this._metrics.recordCounter(UNKNOWN_COUNTER, 1L);
            if (obj instanceof Command) {
                this._metrics.recordCounter(UNKNOWN_COMMAND_COUNTER, 1L);
                LogBuilder addData2 = LOGGER.warn().setMessage("Unable to process message").addData("reason", "unsupported command").addData("actor", self()).addData("data", obj);
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_7, this, addData2));
                addData2.log();
                unhandled(obj);
                return;
            }
            this._metrics.recordCounter("Actors/Connection/UNKNOWN", 1L);
            LogBuilder addData3 = LOGGER.warn().setMessage("Unable to process message").addData("reason", "unsupported message").addData("actor", self()).addData("data", obj);
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_8, this, addData3));
            addData3.log();
            unhandled(obj);
        }).build();
    }

    public void send(ObjectNode objectNode) {
        try {
            this._channel.tell(TextMessage.create(OBJECT_MAPPER.writeValueAsString(objectNode)), self());
        } catch (JsonProcessingException e) {
            LogBuilder throwable = LOGGER.error().setMessage("Unable to send message").addData("reason", "serialization exception").addData("actor", self()).addData("data", objectNode).setThrowable(e);
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_0, this, throwable));
            throwable.log();
        }
    }

    public void sendCommand(String str, ObjectNode objectNode) {
        ObjectNode objectNode2 = JsonNodeFactory.instance.objectNode();
        objectNode2.put("command", str);
        objectNode2.set("data", objectNode);
        send(objectNode2);
    }

    public void subscribe(String str, String str2, String str3) {
        if (!this._subscriptions.containsKey(str)) {
            this._subscriptions.put(str, Maps.newHashMap());
        }
        Map<String, Set<String>> map = this._subscriptions.get(str);
        if (!map.containsKey(str2)) {
            map.put(str2, Sets.newHashSet());
        }
        Set<String> set = map.get(str2);
        if (set.contains(str3)) {
            return;
        }
        set.add(str3);
    }

    public void unsubscribe(String str, String str2, String str3) {
        if (this._subscriptions.containsKey(str)) {
            Map<String, Set<String>> map = this._subscriptions.get(str);
            if (map.containsKey(str2)) {
                Set<String> set = map.get(str2);
                if (set.contains(str3)) {
                    set.remove(str3);
                }
            }
        }
    }

    public ActorRef getTelemetry() {
        return this._telemetry;
    }

    @LogValue
    public Object toLogValue() {
        return LogValueMapFactory.builder(this).put("connection", this._channel).put("messageProcessors", this._messageProcessors).put("subscriptions", this._subscriptions).build();
    }

    public String toString() {
        return toLogValue().toString();
    }

    private void processPeriodicData(PeriodicData periodicData) {
        Key dimensions = periodicData.getDimensions();
        String host = dimensions.getHost();
        String service = dimensions.getService();
        Map<String, Set<String>> map = this._subscriptions.get(service);
        if (map == null) {
            LogBuilder addData = LOGGER.trace().setMessage("Not sending MetricReport").addData("reason", "service not found in subscriptions").addData(Key.SERVICE_DIMENSION_KEY, service);
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_1, this, addData));
            addData.log();
            return;
        }
        Iterator it = periodicData.getData().entries().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            String str = (String) entry.getKey();
            AggregatedData aggregatedData = (AggregatedData) entry.getValue();
            Set<String> set = map.get(str);
            if (set == null) {
                LogBuilder addData2 = LOGGER.trace().setMessage("Not sending MetricReport").addData("reason", "metric not found in subscriptions").addData("metric", str);
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_2, this, addData2));
                addData2.log();
            } else {
                String name = aggregatedData.getStatistic().getName();
                if (set.contains(name)) {
                    MetricReport metricReport = new MetricReport(service, host, name, str, aggregatedData.getValue().getValue(), aggregatedData.getValue().getUnit(), periodicData.getStart());
                    Iterator<MessagesProcessor> it2 = this._messageProcessors.iterator();
                    while (it2.hasNext() && !it2.next().handleMessage(metricReport)) {
                    }
                } else {
                    LogBuilder addData3 = LOGGER.trace().setMessage("Not sending MetricReport").addData("reason", "statistic not found in subscriptions").addData("statistic", name);
                    LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_3, this, addData3));
                    addData3.log();
                }
            }
        }
    }

    private static void ajc$preClinit() {
        Factory factory = new Factory("Connection.java", Connection.class);
        ajc$tjp_0 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 176);
        ajc$tjp_1 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 277);
        ajc$tjp_2 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 291);
        ajc$tjp_3 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 301);
        ajc$tjp_4 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 92);
        ajc$tjp_5 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 104);
        ajc$tjp_6 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 117);
        ajc$tjp_7 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 144);
        ajc$tjp_8 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 153);
    }
}
