package com.arpnetworking.commons.akka;

import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.cluster.sharding.ShardCoordinator;
import akka.dispatch.Futures;
import com.arpnetworking.steno.LogBuilder;
import com.arpnetworking.steno.Logger;
import com.arpnetworking.steno.LoggerFactory;
import com.arpnetworking.steno.aspect.LogBuilderAspect;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.Serializable;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.aspectj.lang.JoinPoint;
import org.aspectj.runtime.reflect.Factory;
import play.core.enhancers.PropertiesEnhancer;
import scala.collection.JavaConversions;
import scala.collection.Seq;
import scala.collection.immutable.IndexedSeq;
import scala.concurrent.Future;

@PropertiesEnhancer.GeneratedAccessor
@PropertiesEnhancer.RewrittenAccessor
/* loaded from: input_file:com/arpnetworking/commons/akka/ParallelLeastShardAllocationStrategy.class */
public final class ParallelLeastShardAllocationStrategy extends ShardCoordinator.AbstractShardAllocationStrategy {
    private Map<String, ActorRef> _pendingRebalances = Maps.newHashMap();
    private final int _maxParallel;
    private final int _rebalanceThreshold;
    private final Optional<ActorSelection> _notify;
    private static final Logger LOGGER;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_0;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_1;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_2;

    @PropertiesEnhancer.GeneratedAccessor
    @PropertiesEnhancer.RewrittenAccessor
    /* loaded from: input_file:com/arpnetworking/commons/akka/ParallelLeastShardAllocationStrategy$RebalanceNotification.class */
    public static final class RebalanceNotification implements Serializable {
        private final ImmutableMap<ActorRef, Set<String>> _currentAllocations;
        private final ImmutableSet<String> _inflightRebalances;
        private final ImmutableMap<String, ActorRef> _pendingRebalances;
        private final ZonedDateTime _timestamp = ZonedDateTime.now();
        private static final long serialVersionUID = 1;

        public RebalanceNotification(Map<ActorRef, Set<String>> map, Set<String> set, Map<String, ActorRef> map2) {
            this._currentAllocations = ImmutableMap.copyOf(map);
            this._inflightRebalances = ImmutableSet.copyOf(set);
            this._pendingRebalances = ImmutableMap.copyOf(map2);
        }

        public Map<ActorRef, Set<String>> getCurrentAllocations() {
            return this._currentAllocations;
        }

        public Set<String> getInflightRebalances() {
            return this._inflightRebalances;
        }

        public ZonedDateTime getTimestamp() {
            return this._timestamp;
        }

        public Map<String, ActorRef> getPendingRebalances() {
            return this._pendingRebalances;
        }
    }

    @PropertiesEnhancer.GeneratedAccessor
    @PropertiesEnhancer.RewrittenAccessor
    /* loaded from: input_file:com/arpnetworking/commons/akka/ParallelLeastShardAllocationStrategy$RegionShardAllocations.class */
    private static final class RegionShardAllocations {
        private int _incomingShardsCount;
        private final ActorRef _region;
        private final Set<String> _shards;

        private RegionShardAllocations(ActorRef actorRef, Set<String> set) {
            this._incomingShardsCount = 0;
            this._region = actorRef;
            this._shards = Sets.newHashSet(set);
        }

        public ActorRef getRegion() {
            return this._region;
        }

        public Set<String> getShards() {
            return Collections.unmodifiableSet(this._shards);
        }

        public int getEffectiveShardCount() {
            return this._shards.size() + this._incomingShardsCount;
        }

        public void removeShard(String str) {
            this._shards.remove(str);
        }

        public void incrementIncoming() {
            this._incomingShardsCount++;
        }

        /* synthetic */ RegionShardAllocations(ActorRef actorRef, Set set, RegionShardAllocations regionShardAllocations) {
            this(actorRef, set);
        }
    }

    static {
        ajc$preClinit();
        LOGGER = LoggerFactory.getLogger(ParallelLeastShardAllocationStrategy.class);
    }

    public ParallelLeastShardAllocationStrategy(int i, int i2, Optional<ActorSelection> optional) {
        this._maxParallel = i;
        this._rebalanceThreshold = i2;
        this._notify = optional;
    }

    public Future<ActorRef> allocateShard(ActorRef actorRef, String str, Map<ActorRef, IndexedSeq<String>> map) {
        return this._pendingRebalances.containsKey(str) ? Futures.successful(this._pendingRebalances.get(str)) : Futures.successful(map.entrySet().stream().sorted(Comparator.comparingInt(entry -> {
            return JavaConversions.seqAsJavaList((Seq) entry.getValue()).size();
        })).findFirst().get().getKey());
    }

    public Future<Set<String>> rebalance(Map<ActorRef, IndexedSeq<String>> map, Set<String> set) {
        this._pendingRebalances.keySet().retainAll(set);
        TreeSet treeSet = new TreeSet(Comparator.comparingInt((v0) -> {
            return v0.getEffectiveShardCount();
        }));
        for (Map.Entry<ActorRef, IndexedSeq<String>> entry : map.entrySet()) {
            treeSet.add(new RegionShardAllocations(entry.getKey(), (Set) JavaConversions.setAsJavaSet(entry.getValue().toSet()).stream().filter(str -> {
                return !set.contains(str);
            }).collect(Collectors.toSet()), null));
        }
        HashSet newHashSet = Sets.newHashSet();
        int i = 0;
        while (true) {
            if (i >= this._maxParallel - set.size()) {
                break;
            }
            RegionShardAllocations regionShardAllocations = (RegionShardAllocations) treeSet.pollFirst();
            RegionShardAllocations regionShardAllocations2 = (RegionShardAllocations) treeSet.pollLast();
            if (regionShardAllocations == null || regionShardAllocations2 == null) {
                break;
            }
            if (regionShardAllocations2.getEffectiveShardCount() - regionShardAllocations.getEffectiveShardCount() < this._rebalanceThreshold) {
                LogBuilder addData = LOGGER.debug().setMessage("Not rebalancing any (more) shards, shard region with most shards already balanced with least").addData("most", Integer.valueOf(regionShardAllocations2.getEffectiveShardCount())).addData("least", Integer.valueOf(regionShardAllocations.getEffectiveShardCount())).addData("rebalanceThreshold", Integer.valueOf(this._rebalanceThreshold));
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_1, this, addData));
                addData.log();
                break;
            }
            String str2 = (String) Iterables.get(regionShardAllocations2.getShards(), 0);
            regionShardAllocations2.removeShard(str2);
            regionShardAllocations.incrementIncoming();
            newHashSet.add(str2);
            this._pendingRebalances.put(str2, regionShardAllocations.getRegion());
            treeSet.add(regionShardAllocations2);
            treeSet.add(regionShardAllocations);
            i++;
        }
        LogBuilder message = LOGGER.trace().setMessage("Cannot rebalance shards, less than 2 shard regions found.");
        LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_0, this, message));
        message.log();
        RebalanceNotification rebalanceNotification = new RebalanceNotification(Maps.transformValues(map, indexedSeq -> {
            return Sets.newHashSet(JavaConversions.seqAsJavaList(indexedSeq));
        }), set, this._pendingRebalances);
        LogBuilder addData2 = LOGGER.trace().setMessage("Broadcasting rebalance info").addData("target", this._notify).addData("shardAllocations", rebalanceNotification);
        LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_2, this, addData2));
        addData2.log();
        if (this._notify.isPresent()) {
            this._notify.get().tell(rebalanceNotification, ActorRef.noSender());
        }
        return Futures.successful(newHashSet);
    }

    private static /* synthetic */ void ajc$preClinit() {
        Factory factory = new Factory("ParallelLeastShardAllocationStrategy.java", ParallelLeastShardAllocationStrategy.class);
        ajc$tjp_0 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 124);
        ajc$tjp_1 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 135);
        ajc$tjp_2 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 166);
    }
}
