package cloud.orbit.actors.cluster;

import cloud.orbit.actors.cluster.impl.RedisConnectionManager;
import cloud.orbit.actors.cluster.impl.RedisKeyGenerator;
import cloud.orbit.actors.cluster.impl.RedisMsg;
import cloud.orbit.actors.cluster.impl.RedisOrbitClient;
import cloud.orbit.actors.cluster.impl.RedisShardedMap;
import cloud.orbit.concurrent.Task;
import cloud.orbit.tuples.Pair;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cloud/orbit/actors/cluster/RedisClusterPeer.class */
public class RedisClusterPeer implements ClusterPeer {
    private static Logger logger = LoggerFactory.getLogger(RedisClusterPeer.class);
    private ViewListener viewListener;
    private MessageListener messageListener;
    private String clusterName;
    private RedisClusterConfig config;
    private RedisConnectionManager redisConnectionManager;
    private NodeAddress localAddress = new NodeAddressImpl(UUID.randomUUID());
    private final ConcurrentMap<String, ConcurrentMap<?, ?>> cacheManager = new ConcurrentHashMap();

    public RedisClusterPeer(RedisClusterConfig redisClusterConfig) {
        this.config = redisClusterConfig;
    }

    public <K, V> ConcurrentMap<K, V> getCache(String str) {
        String key = RedisKeyGenerator.key("shardedMap", Pair.of("cluster", this.clusterName), Pair.of("mapName", str));
        ConcurrentMap<?, ?> concurrentMap = this.cacheManager.get(key);
        if (concurrentMap == null) {
            RedisShardedMap redisShardedMap = new RedisShardedMap(key, this.redisConnectionManager.getActorDirectoryClients(), this.config.getShardingBuckets());
            concurrentMap = this.cacheManager.putIfAbsent(key, redisShardedMap);
            if (concurrentMap == null) {
                concurrentMap = redisShardedMap;
            }
        }
        return (ConcurrentMap<K, V>) concurrentMap;
    }

    public NodeAddress localAddress() {
        return this.localAddress;
    }

    public Task<?> join(String str, String str2) {
        logger.info("Joining Redis Cluster '{}' as node '{}' [{}]...", new Object[]{str, str2, this.localAddress.asUUID().toString()});
        this.clusterName = str;
        this.redisConnectionManager = new RedisConnectionManager(this.config);
        this.redisConnectionManager.subscribeToChannel(RedisKeyGenerator.nodeKey(str, this.localAddress.toString()), (str3, obj) -> {
            receiveMessage((RedisMsg) obj);
        });
        writeMyEntry();
        syncNodes();
        return Task.done();
    }

    private void writeMyEntry() {
        String nodeKey = RedisKeyGenerator.nodeKey(this.clusterName, this.localAddress.toString());
        this.redisConnectionManager.getShardedNodeDirectoryClient(nodeKey).getRedissonClient().getBucket(nodeKey).set(this.localAddress.toString(), this.config.getNodeLifetimeSeconds().intValue(), TimeUnit.SECONDS);
    }

    private void syncNodes() {
        String nodeKey = RedisKeyGenerator.nodeKey(this.clusterName, "*");
        ArrayList<String> arrayList = new ArrayList();
        Iterator<RedisOrbitClient> it = this.redisConnectionManager.getNodeDirectoryClients().iterator();
        while (it.hasNext()) {
            arrayList.addAll(it.next().getRedissonClient().getKeys().findKeysByPattern(nodeKey));
        }
        ArrayList arrayList2 = new ArrayList();
        for (String str : arrayList) {
            arrayList2.add(new NodeAddressImpl(UUID.fromString((String) this.redisConnectionManager.getShardedNodeDirectoryClient(str).getRedissonClient().getBucket(str).get())));
        }
        this.viewListener.onViewChange(arrayList2);
    }

    public void sendMessage(NodeAddress nodeAddress, byte[] bArr) {
        RedisMsg redisMsg = new RedisMsg();
        redisMsg.setMessageContents(bArr);
        redisMsg.setSenderAddress(this.localAddress.asUUID());
        this.redisConnectionManager.sendMessageToChannel(RedisKeyGenerator.nodeKey(this.clusterName, nodeAddress.toString()), redisMsg);
    }

    public void receiveMessage(RedisMsg redisMsg) {
        Task.runAsync(() -> {
            this.messageListener.receive(new NodeAddressImpl(redisMsg.getSenderAddress()), redisMsg.getMessageContents());
        }, this.config.getCoreExecutorService()).exceptionally(th -> {
            logger.error("Error receiving message", th);
            return null;
        });
    }

    public Task pulse() {
        writeMyEntry();
        syncNodes();
        return Task.done();
    }

    public void leave() {
        String nodeKey = RedisKeyGenerator.nodeKey(this.clusterName, this.localAddress.toString());
        this.redisConnectionManager.getShardedNodeDirectoryClient(nodeKey).getRedissonClient().getBucket(nodeKey).delete();
        this.redisConnectionManager.shutdownConnections();
    }

    public void registerMessageReceiver(MessageListener messageListener) {
        this.messageListener = messageListener;
    }

    public void registerViewListener(ViewListener viewListener) {
        this.viewListener = viewListener;
    }
}
