package com.arpnetworking.clusteraggregator.aggregation;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.actor.Scheduler;
import akka.cluster.sharding.ShardRegion;
import com.arpnetworking.clusteraggregator.AggregatorLifecycle;
import com.arpnetworking.clusteraggregator.models.CombinedMetricData;
import com.arpnetworking.metrics.aggregation.protocol.Messages;
import com.arpnetworking.steno.LogBuilder;
import com.arpnetworking.steno.Logger;
import com.arpnetworking.steno.LoggerFactory;
import com.arpnetworking.steno.aspect.LogBuilderAspect;
import com.arpnetworking.tsdcore.model.AggregatedData;
import com.arpnetworking.tsdcore.model.CalculatedValue;
import com.arpnetworking.tsdcore.model.FQDSN;
import com.arpnetworking.tsdcore.model.PeriodicData;
import com.arpnetworking.tsdcore.model.Quantity;
import com.arpnetworking.tsdcore.statistics.Statistic;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import java.io.Serializable;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.aspectj.lang.JoinPoint;
import org.aspectj.runtime.reflect.Factory;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:com/arpnetworking/clusteraggregator/aggregation/StreamingAggregator.class */
public class StreamingAggregator extends AbstractActor {
    private final ActorRef _emitter;
    private final ActorRef _lifecycleTracker;
    private final ActorRef _periodicStatistics;
    private final String _clusterHostSuffix;
    private Period _period;
    private String _cluster;
    private String _metric;
    private String _service;
    private ImmutableMap<String, String> _dimensions;
    private AggregatedData.Builder _resultBuilder;
    private static final Duration AGG_TIMEOUT;
    private static final Logger LOGGER;
    private static final Predicate<Map.Entry<String, String>> NOT_EXPLICIT_DIMENSION;
    private static final JoinPoint.StaticPart ajc$tjp_0 = null;
    private static final JoinPoint.StaticPart ajc$tjp_1 = null;
    private static final JoinPoint.StaticPart ajc$tjp_2 = null;
    private static final JoinPoint.StaticPart ajc$tjp_3 = null;
    private static final JoinPoint.StaticPart ajc$tjp_4 = null;
    private static final JoinPoint.StaticPart ajc$tjp_5 = null;
    private static final JoinPoint.StaticPart ajc$tjp_6 = null;
    private static final JoinPoint.StaticPart ajc$tjp_7 = null;
    private static final JoinPoint.StaticPart ajc$tjp_8 = null;
    private static final JoinPoint.StaticPart ajc$tjp_9 = null;
    private final LinkedList<StreamingAggregationBucket> _aggBuckets = Lists.newLinkedList();
    private final Set<Statistic> _statistics = Sets.newHashSet();
    private boolean _initialized = false;

    /* loaded from: input_file:com/arpnetworking/clusteraggregator/aggregation/StreamingAggregator$BucketCheck.class */
    private static final class BucketCheck implements Serializable {
        private static final long serialVersionUID = 1;

        private BucketCheck() {
        }

        /* synthetic */ BucketCheck(BucketCheck bucketCheck) {
            this();
        }
    }

    /* loaded from: input_file:com/arpnetworking/clusteraggregator/aggregation/StreamingAggregator$ShutdownAggregator.class */
    private static final class ShutdownAggregator implements Serializable {
        private static final long serialVersionUID = 1;

        private ShutdownAggregator() {
        }

        /* synthetic */ ShutdownAggregator(ShutdownAggregator shutdownAggregator) {
            this();
        }
    }

    /* loaded from: input_file:com/arpnetworking/clusteraggregator/aggregation/StreamingAggregator$UpdateBookkeeper.class */
    private static final class UpdateBookkeeper implements Serializable {
        private static final long serialVersionUID = 1;

        private UpdateBookkeeper() {
        }

        /* synthetic */ UpdateBookkeeper(UpdateBookkeeper updateBookkeeper) {
            this();
        }
    }

    static {
        ajc$preClinit();
        AGG_TIMEOUT = Duration.standardMinutes(1L);
        LOGGER = LoggerFactory.getLogger(StreamingAggregator.class);
        NOT_EXPLICIT_DIMENSION = entry -> {
            return (((String) entry.getKey()).equals(CombinedMetricData.CLUSTER_KEY) || ((String) entry.getKey()).equals(CombinedMetricData.HOST_KEY) || ((String) entry.getKey()).equals(CombinedMetricData.SERVICE_KEY)) ? false : true;
        };
    }

    public static Props props(ActorRef actorRef, ActorRef actorRef2, ActorRef actorRef3, String str) {
        return Props.create(StreamingAggregator.class, new Object[]{actorRef, actorRef2, actorRef3, str});
    }

    @Inject
    public StreamingAggregator(@Named("bookkeeper-proxy") ActorRef actorRef, @Named("periodic-statistics") ActorRef actorRef2, @Named("cluster-emitter") ActorRef actorRef3, @Named("cluster-host-suffix") String str) {
        this._lifecycleTracker = actorRef;
        this._periodicStatistics = actorRef2;
        this._clusterHostSuffix = str;
        context().setReceiveTimeout(FiniteDuration.apply(30L, TimeUnit.MINUTES));
        Scheduler scheduler = getContext().system().scheduler();
        scheduler.schedule(FiniteDuration.apply(5L, TimeUnit.SECONDS), FiniteDuration.apply(5L, TimeUnit.SECONDS), getSelf(), new BucketCheck(null), getContext().dispatcher(), getSelf());
        scheduler.schedule(FiniteDuration.apply(5L, TimeUnit.SECONDS), FiniteDuration.apply(1L, TimeUnit.HOURS), getSelf(), new UpdateBookkeeper(null), getContext().dispatcher(), getSelf());
        this._emitter = actorRef3;
    }

    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(Messages.StatisticSetRecord.class, statisticSetRecord -> {
            LogBuilder addContext = LOGGER.debug().setMessage("Processing a StatisticSetRecord").addData("workItem", statisticSetRecord).addContext("actor", self());
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_8, this, addContext));
            addContext.log();
            processAggregationMessage(statisticSetRecord);
        }).match(BucketCheck.class, bucketCheck -> {
            if (this._initialized) {
                while (this._aggBuckets.size() > 0) {
                    StreamingAggregationBucket first = this._aggBuckets.getFirst();
                    if (!first.getPeriodStart().plus(this._period).plus(AGG_TIMEOUT).isBeforeNow()) {
                        return;
                    }
                    this._aggBuckets.removeFirst();
                    Map<Statistic, CalculatedValue<?>> compute = first.compute();
                    ImmutableList.Builder builder = ImmutableList.builder();
                    for (Map.Entry<Statistic, CalculatedValue<?>> entry : compute.entrySet()) {
                        this._statistics.add(entry.getKey());
                        AggregatedData aggregatedData = (AggregatedData) this._resultBuilder.setFQDSN((FQDSN) new FQDSN.Builder().setCluster(this._cluster).setMetric(this._metric).setService(this._service).setStatistic(entry.getKey()).build()).setStart(first.getPeriodStart()).setValue(entry.getValue().getValue()).setSupportingData(entry.getValue().getData()).setPopulationSize(0L).setIsSpecified(Boolean.valueOf(first.isSpecified(entry.getKey()))).build();
                        LogBuilder addContext = LOGGER.debug().setMessage("Computed result").addData("result", aggregatedData).addContext("actor", self());
                        LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_9, this, addContext));
                        addContext.log();
                        builder.add(aggregatedData);
                        this._periodicStatistics.tell(aggregatedData, getSelf());
                    }
                    this._emitter.tell((PeriodicData) new PeriodicData.Builder().setData(builder.build()).setDimensions(this._dimensions).setConditions(ImmutableList.of()).setPeriod(this._period).setStart(first.getPeriodStart()).build(), getSelf());
                }
            }
        }).match(UpdateBookkeeper.class, updateBookkeeper -> {
            if (this._resultBuilder != null) {
                Iterator<Statistic> it = this._statistics.iterator();
                while (it.hasNext()) {
                    this._resultBuilder.setFQDSN((FQDSN) new FQDSN.Builder().setCluster(this._cluster).setMetric(this._metric).setService(this._service).setStatistic(it.next()).build());
                    this._lifecycleTracker.tell(new AggregatorLifecycle.NotifyAggregatorStarted((AggregatedData) this._resultBuilder.build()), getSelf());
                }
                this._statistics.clear();
            }
        }).match(ShutdownAggregator.class, shutdownAggregator -> {
            context().stop(self());
        }).match(ReceiveTimeout.class, receiveTimeout -> {
            getContext().parent().tell(new ShardRegion.Passivate(new ShutdownAggregator(null)), getSelf());
        }).build();
    }

    public void preRestart(Throwable th, Optional<Object> optional) throws Exception {
        LogBuilder addContext = LOGGER.error().setMessage("Aggregator crashing").setThrowable(th).addData("triggeringMessage", optional.orElse(null)).addContext("actor", self());
        LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_0, this, addContext));
        addContext.log();
        super.preRestart(th, optional);
    }

    private void processAggregationMessage(Messages.StatisticSetRecord statisticSetRecord) {
        CombinedMetricData combinedMetricData = (CombinedMetricData) CombinedMetricData.Builder.fromStatisticSetRecord(statisticSetRecord).build();
        if (!this._initialized) {
            this._period = combinedMetricData.getPeriod();
            this._cluster = combinedMetricData.getCluster();
            this._metric = combinedMetricData.getMetricName();
            this._service = combinedMetricData.getService();
            this._dimensions = dimensionsToMap(statisticSetRecord);
            this._resultBuilder = new AggregatedData.Builder().setHost(createHost()).setPeriod(this._period).setPopulationSize(1L).setSamples(Collections.emptyList()).setStart(DateTime.now().hourOfDay().roundFloorCopy()).setValue((Quantity) new Quantity.Builder().setValue(Double.valueOf(0.0d)).build());
            this._initialized = true;
            LogBuilder addContext = LOGGER.debug().setMessage("Initialized aggregator").addContext("actor", self());
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_1, this, addContext));
            addContext.log();
        } else if (!this._period.equals(combinedMetricData.getPeriod()) || !this._cluster.equals(combinedMetricData.getCluster()) || !this._service.equals(combinedMetricData.getService()) || !this._metric.equals(combinedMetricData.getMetricName())) {
            LogBuilder addContext2 = LOGGER.error().setMessage("Received a work item for another aggregator").addData("workItem", statisticSetRecord).addContext("actor", self());
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_2, this, addContext2));
            addContext2.log();
        }
        DateTime parse = DateTime.parse(statisticSetRecord.getPeriodStart());
        if (this._aggBuckets.size() > 0 && this._aggBuckets.getFirst().getPeriodStart().isAfter(parse)) {
            LogBuilder addContext3 = LOGGER.warn().setMessage("Received a work item that is too old to aggregate").addData("bucketStart", this._aggBuckets.getFirst().getPeriodStart()).addData("workItem", statisticSetRecord).addContext("actor", self());
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_3, this, addContext3));
            addContext3.log();
            return;
        }
        if (this._aggBuckets.size() == 0 || this._aggBuckets.getLast().getPeriodStart().isBefore(parse)) {
            LogBuilder addContext4 = LOGGER.debug().setMessage("Creating new aggregation bucket for period").addData("period", parse).addContext("actor", self());
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_4, this, addContext4));
            addContext4.log();
            this._aggBuckets.add(new StreamingAggregationBucket(parse));
        }
        Iterator<StreamingAggregationBucket> it = this._aggBuckets.iterator();
        StreamingAggregationBucket streamingAggregationBucket = null;
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            StreamingAggregationBucket next = it.next();
            if (next.getPeriodStart().equals(parse)) {
                streamingAggregationBucket = next;
                break;
            }
        }
        if (streamingAggregationBucket == null) {
            LogBuilder addContext5 = LOGGER.error().setMessage("No bucket found to aggregate into, bug in the bucket walk").addContext("actor", self());
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_5, this, addContext5));
            addContext5.log();
            return;
        }
        LogBuilder addContext6 = LOGGER.debug().setMessage("Updating bucket").addData("bucket", streamingAggregationBucket).addData("data", combinedMetricData).addContext("actor", self());
        LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_6, this, addContext6));
        addContext6.log();
        streamingAggregationBucket.update(combinedMetricData);
        LogBuilder addContext7 = LOGGER.debug().setMessage("Done updating bucket").addData("bucket", streamingAggregationBucket).addContext("actor", self());
        LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_7, this, addContext7));
        addContext7.log();
    }

    private ImmutableMap<String, String> dimensionsToMap(Messages.StatisticSetRecord statisticSetRecord) {
        ImmutableMap.Builder put = ImmutableMap.builder().put(CombinedMetricData.CLUSTER_KEY, statisticSetRecord.getCluster()).put(CombinedMetricData.SERVICE_KEY, statisticSetRecord.getService()).put(CombinedMetricData.HOST_KEY, createHost());
        statisticSetRecord.getDimensionsMap().entrySet().stream().filter(NOT_EXPLICIT_DIMENSION).forEach(entry -> {
            put.put((String) entry.getKey(), (String) entry.getValue());
        });
        return put.build();
    }

    private String createHost() {
        return String.valueOf(this._cluster) + "-cluster" + this._clusterHostSuffix;
    }

    private static void ajc$preClinit() {
        Factory factory = new Factory("StreamingAggregator.java", StreamingAggregator.class);
        ajc$tjp_0 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 206);
        ajc$tjp_1 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 232);
        ajc$tjp_2 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 241);
        ajc$tjp_3 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 252);
        ajc$tjp_4 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 260);
        ajc$tjp_5 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 279);
        ajc$tjp_6 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 286);
        ajc$tjp_7 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 292);
        ajc$tjp_8 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 125);
        ajc$tjp_9 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 158);
    }
}
