package com.arpnetworking.clusteraggregator.client;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.io.Tcp;
import akka.io.TcpMessage;
import akka.util.ByteString;
import com.arpnetworking.clusteraggregator.models.AggregationMode;
import com.arpnetworking.clusteraggregator.models.CombinedMetricData;
import com.arpnetworking.metrics.aggregation.protocol.Messages;
import com.arpnetworking.steno.Logger;
import com.arpnetworking.steno.LoggerFactory;
import com.arpnetworking.tsdcore.model.AggregatedData;
import com.arpnetworking.tsdcore.model.AggregationMessage;
import com.arpnetworking.tsdcore.model.FQDSN;
import com.arpnetworking.tsdcore.model.PeriodicData;
import com.arpnetworking.tsdcore.statistics.Statistic;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.lang.invoke.SerializedLambda;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:com/arpnetworking/clusteraggregator/client/AggClientConnection.class */
public class AggClientConnection extends AbstractActor {
    private Optional<String> _hostName = Optional.empty();
    private Optional<String> _clusterName = Optional.empty();
    private ByteString _buffer = ByteString.empty();
    private final ActorRef _connection;
    private final InetSocketAddress _remoteAddress;
    private final boolean _calculateAggregates;
    private static final Logger LOGGER = LoggerFactory.getLogger(AggClientConnection.class);
    private static final Logger INCOMPLETE_RECORD_LOGGER = LoggerFactory.getRateLimitLogger(AggClientConnection.class, Duration.ofSeconds(30));

    public static Props props(ActorRef actorRef, InetSocketAddress inetSocketAddress, FiniteDuration finiteDuration, boolean z) {
        return Props.create(AggClientConnection.class, () -> {
            return new AggClientConnection(actorRef, inetSocketAddress, finiteDuration, z);
        });
    }

    public AggClientConnection(ActorRef actorRef, InetSocketAddress inetSocketAddress, FiniteDuration finiteDuration, boolean z) {
        this._connection = actorRef;
        this._remoteAddress = inetSocketAddress;
        this._calculateAggregates = z;
        getContext().watch(actorRef);
        context().system().scheduler().scheduleOnce(finiteDuration, self(), TcpMessage.close(), context().dispatcher(), self());
    }

    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(Tcp.Received.class, received -> {
            ByteString data = received.data();
            LOGGER.trace().setMessage("Received a tcp message").addData("length", Integer.valueOf(data.length())).addContext("actor", self()).log();
            this._buffer = this._buffer.concat(data);
            processMessages();
        }).match(Tcp.CloseCommand.class, closeCommand -> {
            LOGGER.debug().setMessage("Connection timeout hit, cycling connection").addData("remote", this._remoteAddress).addContext("actor", self()).log();
            if (this._connection != null) {
                this._connection.tell(closeCommand, self());
            }
        }).match(Tcp.ConnectionClosed.class, connectionClosed -> {
            getContext().stop(getSelf());
        }).match(Terminated.class, terminated -> {
            LOGGER.info().setMessage("Connection actor terminated").addData("terminated", terminated.actor()).addContext("actor", self()).log();
            if (terminated.actor().equals(this._connection)) {
                getContext().stop(getSelf());
            } else {
                unhandled(terminated);
            }
        }).build();
    }

    private void processMessages() {
        AggregationMode aggregationMode = this._calculateAggregates ? AggregationMode.PERSIST_AND_REAGGREGATE : AggregationMode.PERSIST;
        ByteString byteString = this._buffer;
        Optional<AggregationMessage> deserialize = AggregationMessage.deserialize(byteString);
        while (deserialize.isPresent()) {
            AggregationMessage aggregationMessage = deserialize.get();
            byteString = byteString.drop(aggregationMessage.getLength());
            Messages.HostIdentification message = aggregationMessage.getMessage();
            if (message instanceof Messages.HostIdentification) {
                Messages.HostIdentification hostIdentification = message;
                this._hostName = Optional.ofNullable(hostIdentification.getHostName());
                this._clusterName = Optional.ofNullable(hostIdentification.getClusterName());
                LOGGER.info().setMessage("Handshake received").addData(CombinedMetricData.HOST_KEY, this._hostName.orElse("")).addData(CombinedMetricData.CLUSTER_KEY, this._clusterName.orElse("")).addContext("actor", self()).log();
            } else if (message instanceof Messages.StatisticSetRecord) {
                Messages.StatisticSetRecord statisticSetRecord = (Messages.StatisticSetRecord) message;
                LOGGER.trace().setMessage("StatisticSet record received").addData("aggregation", statisticSetRecord).addContext("actor", self()).log();
                if (aggregationMode.shouldReaggregate()) {
                    getContext().parent().tell(statisticSetRecord, getSelf());
                }
                if (aggregationMode.shouldPersist() && statisticSetRecord.getStatisticsCount() > 0) {
                    Optional<PeriodicData> buildPeriodicData = buildPeriodicData(statisticSetRecord);
                    if (buildPeriodicData.isPresent()) {
                        getContext().parent().tell(buildPeriodicData.get(), self());
                    }
                }
            } else if (message instanceof Messages.HeartbeatRecord) {
                LOGGER.debug().setMessage("Heartbeat received").addData(CombinedMetricData.HOST_KEY, this._hostName.orElse("")).addData(CombinedMetricData.CLUSTER_KEY, this._clusterName.orElse("")).addContext("actor", self()).log();
            } else {
                LOGGER.warn().setMessage("Unknown message type").addData("type", message.getClass()).addContext("actor", self()).log();
            }
            deserialize = AggregationMessage.deserialize(byteString);
            if (!deserialize.isPresent() && byteString.length() > 4) {
                LOGGER.debug().setMessage("buffer did not deserialize completely").addData("remainingBytes", Integer.valueOf(byteString.length())).addContext("actor", self()).log();
            }
        }
        this._buffer = byteString;
    }

    private Optional<PeriodicData> buildPeriodicData(Messages.StatisticSetRecord statisticSetRecord) {
        CombinedMetricData combinedMetricData = (CombinedMetricData) CombinedMetricData.Builder.fromStatisticSetRecord(statisticSetRecord).build();
        ImmutableList.Builder builder = ImmutableList.builder();
        Map dimensionsMap = statisticSetRecord.getDimensionsMap();
        ImmutableMap.Builder builder2 = ImmutableMap.builder();
        dimensionsMap.entrySet().stream().filter(entry -> {
            return (CombinedMetricData.HOST_KEY.equals(entry.getKey()) || CombinedMetricData.SERVICE_KEY.equals(entry.getKey()) || CombinedMetricData.CLUSTER_KEY.equals(entry.getKey())) ? false : true;
        }).forEach(entry2 -> {
            builder2.put(entry2.getKey(), entry2.getValue());
        });
        Optional<String> ofNullable = Optional.ofNullable(dimensionsMap.get(CombinedMetricData.HOST_KEY));
        Optional ofNullable2 = Optional.ofNullable(dimensionsMap.get(CombinedMetricData.SERVICE_KEY));
        Optional<String> ofNullable3 = Optional.ofNullable(dimensionsMap.get(CombinedMetricData.CLUSTER_KEY));
        if (!ofNullable2.isPresent()) {
            ofNullable2 = Optional.ofNullable(statisticSetRecord.getService());
        }
        if (!ofNullable3.isPresent()) {
            ofNullable3 = Optional.ofNullable(statisticSetRecord.getCluster());
            if (!ofNullable3.isPresent()) {
                ofNullable3 = this._clusterName;
            }
        }
        if (!ofNullable.isPresent()) {
            ofNullable = this._hostName;
        }
        builder2.put(CombinedMetricData.HOST_KEY, ofNullable.orElse(""));
        builder2.put(CombinedMetricData.SERVICE_KEY, ofNullable2.orElse(""));
        builder2.put(CombinedMetricData.CLUSTER_KEY, ofNullable3.orElse(""));
        if (!ofNullable.isPresent() || !ofNullable2.isPresent() || !ofNullable3.isPresent()) {
            INCOMPLETE_RECORD_LOGGER.warn().setMessage("Cannot process StatisticSet record, missing required fields.").addData(CombinedMetricData.HOST_KEY, ofNullable).addData(CombinedMetricData.SERVICE_KEY, ofNullable2).addData(CombinedMetricData.CLUSTER_KEY, ofNullable3).log();
            return Optional.empty();
        }
        ImmutableMap<String, String> build = builder2.build();
        long computePopulationSize = CombinedMetricData.computePopulationSize(statisticSetRecord.getMetric(), combinedMetricData.getCalculatedValues());
        for (Map.Entry<Statistic, CombinedMetricData.StatisticValue> entry3 : combinedMetricData.getCalculatedValues().entrySet()) {
            builder.add((AggregatedData) new AggregatedData.Builder().setFQDSN((FQDSN) new FQDSN.Builder().setCluster(statisticSetRecord.getCluster()).setMetric(statisticSetRecord.getMetric()).setService(statisticSetRecord.getService()).setStatistic(entry3.getKey()).build()).setHost(ofNullable.get()).setIsSpecified(entry3.getValue().getUserSpecified()).setPeriod(combinedMetricData.getPeriod()).setPopulationSize(Long.valueOf(computePopulationSize)).setSamples(Collections.emptyList()).setStart(combinedMetricData.getPeriodStart()).setSupportingData(entry3.getValue().getValue().getData()).setValue(entry3.getValue().getValue().getValue()).build());
        }
        return Optional.of(new PeriodicData.Builder().setData(builder.build()).setConditions(ImmutableList.of()).setDimensions(build).setPeriod(combinedMetricData.getPeriod()).setStart(combinedMetricData.getPeriodStart()).build());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1000093509:
                if (implMethodName.equals("lambda$props$78c03ae$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/Creator") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/arpnetworking/clusteraggregator/client/AggClientConnection") && serializedLambda.getImplMethodSignature().equals("(Lakka/actor/ActorRef;Ljava/net/InetSocketAddress;Lscala/concurrent/duration/FiniteDuration;Z)Lcom/arpnetworking/clusteraggregator/client/AggClientConnection;")) {
                    ActorRef actorRef = (ActorRef) serializedLambda.getCapturedArg(0);
                    InetSocketAddress inetSocketAddress = (InetSocketAddress) serializedLambda.getCapturedArg(1);
                    FiniteDuration finiteDuration = (FiniteDuration) serializedLambda.getCapturedArg(2);
                    boolean booleanValue = ((Boolean) serializedLambda.getCapturedArg(3)).booleanValue();
                    return () -> {
                        return new AggClientConnection(actorRef, inetSocketAddress, finiteDuration, booleanValue);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
