package com.arpnetworking.clusteraggregator;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Cancellable;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ShardRegion;
import com.arpnetworking.clusteraggregator.models.ShardAllocation;
import com.arpnetworking.metrics.Metrics;
import com.arpnetworking.metrics.MetricsFactory;
import com.arpnetworking.steno.LogBuilder;
import com.arpnetworking.steno.Logger;
import com.arpnetworking.steno.LoggerFactory;
import com.arpnetworking.steno.aspect.LogBuilderAspect;
import com.arpnetworking.utility.ParallelLeastShardAllocationStrategy;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.aspectj.lang.JoinPoint;
import org.aspectj.runtime.reflect.Factory;
import scala.compat.java8.OptionConverters;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:com/arpnetworking/clusteraggregator/ClusterStatusCache.class */
public class ClusterStatusCache extends AbstractActor {
    private final Cluster _cluster;
    private final ClusterSharding _sharding;
    private final Duration _pollInterval;
    private final MetricsFactory _metricsFactory;

    @Nullable
    private Cancellable _pollTimer;
    private static final String POLL = "poll";
    private static final Logger LOGGER;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_0;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_1;
    private Optional<ClusterEvent.CurrentClusterState> _clusterState = Optional.empty();
    private Optional<ParallelLeastShardAllocationStrategy.RebalanceNotification> _rebalanceState = Optional.empty();

    /* loaded from: input_file:com/arpnetworking/clusteraggregator/ClusterStatusCache$GetRequest.class */
    public static final class GetRequest implements Serializable {
        private static final long serialVersionUID = 2804853560963013618L;
    }

    /* loaded from: input_file:com/arpnetworking/clusteraggregator/ClusterStatusCache$StatusResponse.class */
    public static final class StatusResponse implements Serializable {

        @Nullable
        private final ClusterEvent.CurrentClusterState _clusterState;

        @SuppressFBWarnings({"SE_BAD_FIELD"})
        private final Optional<List<ShardAllocation>> _allocations;
        private static final long serialVersionUID = 603308359721162702L;

        public StatusResponse(@Nullable ClusterEvent.CurrentClusterState currentClusterState, Optional<ParallelLeastShardAllocationStrategy.RebalanceNotification> optional) {
            this._clusterState = currentClusterState;
            if (!optional.isPresent()) {
                this._allocations = Optional.empty();
                return;
            }
            ParallelLeastShardAllocationStrategy.RebalanceNotification rebalanceNotification = optional.get();
            Sets.SetView union = Sets.union(rebalanceNotification.getCurrentAllocations().keySet(), Sets.newHashSet(rebalanceNotification.getPendingRebalances().values()));
            Map<String, ActorRef> pendingRebalances = rebalanceNotification.getPendingRebalances();
            Map<ActorRef, Set<String>> currentAllocations = rebalanceNotification.getCurrentAllocations();
            this._allocations = Optional.of((List) union.stream().map(actorRef -> {
                return computeShardAllocation(pendingRebalances, currentAllocations, actorRef);
            }).collect(Collectors.toList()));
        }

        private ShardAllocation computeShardAllocation(Map<String, ActorRef> map, Map<ActorRef, Set<String>> map2, ActorRef actorRef) {
            Set<String> orDefault = map2.getOrDefault(actorRef, Collections.emptySet());
            HashSet newHashSet = Sets.newHashSet((Iterable) Multimaps.invertFrom(Multimaps.forMap(map), ArrayListMultimap.create()).asMap().getOrDefault(actorRef, Collections.emptyList()));
            Set<String> immutableCopy = Sets.intersection(orDefault, map.keySet()).immutableCopy();
            orDefault.removeAll(immutableCopy);
            return (ShardAllocation) new ShardAllocation.Builder().setCurrentShards(orDefault).setIncomingShards(newHashSet).setOutgoingShards(immutableCopy).setHost(ClusterStatusCache.hostFromActorRef(actorRef)).setShardRegion(actorRef).build();
        }

        @Nullable
        public ClusterEvent.CurrentClusterState getClusterState() {
            return this._clusterState;
        }

        public Optional<List<ShardAllocation>> getAllocations() {
            return this._allocations;
        }
    }

    static {
        ajc$preClinit();
        LOGGER = LoggerFactory.getLogger(ClusterStatusCache.class);
    }

    public static Props props(ActorSystem actorSystem, Duration duration, MetricsFactory metricsFactory) {
        return Props.create(ClusterStatusCache.class, new Object[]{actorSystem, duration, metricsFactory});
    }

    public ClusterStatusCache(ActorSystem actorSystem, Duration duration, MetricsFactory metricsFactory) {
        this._cluster = Cluster.get(actorSystem);
        this._sharding = ClusterSharding.get(actorSystem);
        this._pollInterval = duration;
        this._metricsFactory = metricsFactory;
    }

    public void preStart() {
        this._pollTimer = getContext().system().scheduler().schedule(scala.concurrent.duration.Duration.apply(0L, TimeUnit.SECONDS), scala.concurrent.duration.Duration.apply(this._pollInterval.toMillis(), TimeUnit.MILLISECONDS), getSelf(), POLL, getContext().system().dispatcher(), getSelf());
    }

    public void postStop() throws Exception {
        if (this._pollTimer != null) {
            this._pollTimer.cancel();
        }
    }

    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(ClusterEvent.CurrentClusterState.class, currentClusterState -> {
            this._clusterState = Optional.of(currentClusterState);
            Throwable th = null;
            try {
                Metrics create = this._metricsFactory.create();
                try {
                    create.setGauge("akka/members_count", currentClusterState.members().size());
                    if (this._cluster.selfAddress().equals(currentClusterState.getLeader())) {
                        create.setGauge("akka/is_leader", 1L);
                    } else {
                        create.setGauge("akka/is_leader", 0L);
                    }
                    if (create != null) {
                        create.close();
                    }
                } catch (Throwable th2) {
                    if (create != null) {
                        create.close();
                    }
                    throw th2;
                }
            } catch (Throwable th3) {
                if (0 == 0) {
                    th = th3;
                } else if (null != th3) {
                    th.addSuppressed(th3);
                }
                throw th;
            }
        }).match(ShardRegion.ClusterShardingStats.class, this::processShardingStats).match(GetRequest.class, getRequest -> {
            sendResponse(getSender());
        }).match(ParallelLeastShardAllocationStrategy.RebalanceNotification.class, rebalanceNotification -> {
            this._rebalanceState = Optional.of(rebalanceNotification);
        }).matchEquals(POLL, str -> {
            if (!self().equals(sender())) {
                unhandled(str);
                return;
            }
            this._cluster.sendCurrentClusterState(getSelf());
            for (String str : this._sharding.getShardTypeNames()) {
                LogBuilder addData = LOGGER.debug().setMessage("Requesting shard statistics").addData("shardType", str);
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_1, this, addData));
                addData.log();
                this._sharding.shardRegion(str).tell(new ShardRegion.GetClusterShardingStats(FiniteDuration.fromNanos(this._pollInterval.toNanos())), self());
            }
            int i = 0;
            int i2 = 0;
            if (this._rebalanceState.isPresent()) {
                i = this._rebalanceState.get().getInflightRebalances().size();
                i2 = this._rebalanceState.get().getPendingRebalances().size();
            }
            Throwable th = null;
            try {
                Metrics create = this._metricsFactory.create();
                try {
                    create.setGauge("akka/cluster/rebalance/inflight", i);
                    create.setGauge("akka/cluster/rebalance/pending", i2);
                    if (create != null) {
                        create.close();
                    }
                } catch (Throwable th2) {
                    if (create != null) {
                        create.close();
                    }
                    throw th2;
                }
            } catch (Throwable th3) {
                if (0 == 0) {
                    th = th3;
                } else if (null != th3) {
                    th.addSuppressed(th3);
                }
                throw th;
            }
        }).build();
    }

    private void processShardingStats(ShardRegion.ClusterShardingStats clusterShardingStats) {
        LogBuilder addData = LOGGER.debug().setMessage("Processing shard statistics").addData("regionCount", Integer.valueOf(clusterShardingStats.getRegions().size()));
        LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_0, this, addData));
        addData.log();
        HashMap newHashMap = Maps.newHashMap();
        HashMap newHashMap2 = Maps.newHashMap();
        for (Map.Entry entry : clusterShardingStats.getRegions().entrySet()) {
            String hostPort = ((Address) entry.getKey()).hostPort();
            newHashMap.put(hostPort, Integer.valueOf(((ShardRegion.ShardRegionStats) entry.getValue()).getStats().size()));
            for (Object obj : ((ShardRegion.ShardRegionStats) entry.getValue()).getStats().values()) {
                if (obj instanceof Number) {
                    newHashMap2.put(hostPort, Long.valueOf(((Number) obj).longValue() + ((Long) newHashMap2.getOrDefault(hostPort, 0L)).longValue()));
                }
            }
        }
        for (Map.Entry entry2 : newHashMap.entrySet()) {
            Throwable th = null;
            try {
                Metrics create = this._metricsFactory.create();
                try {
                    create.addAnnotation("address", (String) entry2.getKey());
                    create.setGauge("akka/cluster/shards", ((Integer) entry2.getValue()).intValue());
                    Long l = (Long) newHashMap2.get(entry2.getKey());
                    if (l != null) {
                        create.setGauge("akka/cluster/actors", l.longValue());
                    }
                    if (create != null) {
                        create.close();
                    }
                } finally {
                    th = th;
                }
            } catch (Throwable th2) {
                if (th == null) {
                    th = th2;
                } else if (th != th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
    }

    private void sendResponse(ActorRef actorRef) {
        actorRef.tell(new StatusResponse(this._clusterState.orElse(this._cluster.state()), this._rebalanceState), self());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String hostFromActorRef(ActorRef actorRef) {
        return (String) OptionConverters.toJava(actorRef.path().address().host()).orElse("localhost");
    }

    private static /* synthetic */ void ajc$preClinit() {
        Factory factory = new Factory("ClusterStatusCache.java", ClusterStatusCache.class);
        ajc$tjp_0 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 169);
        ajc$tjp_1 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 143);
    }
}
