package co.paralleluniverse.galaxy.jgroups;

import co.paralleluniverse.common.collection.ConcurrentMultimap;
import co.paralleluniverse.common.collection.Util;
import co.paralleluniverse.galaxy.Cluster;
import co.paralleluniverse.galaxy.core.AbstractComm;
import co.paralleluniverse.galaxy.core.Comm;
import co.paralleluniverse.galaxy.core.Message;
import co.paralleluniverse.galaxy.core.MessageReceiver;
import co.paralleluniverse.galaxy.core.NodeNotFoundException;
import gnu.trove.set.hash.TShortHashSet;
import java.beans.ConstructorProperties;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.jgroups.Address;
import org.jgroups.ReceiverAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/paralleluniverse/galaxy/jgroups/JGroupsComm.class */
class JGroupsComm extends AbstractComm<Address> {
    private static final Logger LOG;
    private final Channel channel;
    private final Comm serverComm;
    private final ConcurrentMultimap<Short, Message, Deque<Message>> pendingReply;
    private ConcurrentMap<Long, BroadcastEntry> pendingBroadcasts;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/paralleluniverse/galaxy/jgroups/JGroupsComm$BroadcastEntry.class */
    public static class BroadcastEntry {
        final Message.LineMessage message;
        final TShortHashSet nodes;

        public BroadcastEntry(Message.LineMessage lineMessage, Set<Short> set) {
            this.message = lineMessage;
            this.nodes = new TShortHashSet(set);
            this.nodes.remove((short) 0);
            JGroupsComm.LOG.debug("Awaiting ACKS for message {} from nodes {}", lineMessage, this.nodes);
        }

        public synchronized void addNode(short s) {
            this.nodes.add(s);
        }

        public synchronized boolean removeNode(short s) {
            this.nodes.remove(s);
            return this.nodes.isEmpty();
        }
    }

    @ConstructorProperties({"name", "cluster", "serverComm"})
    public JGroupsComm(String str, Cluster cluster, Comm comm) {
        super(str, cluster, new JGroupsNodeAddressResolver(cluster));
        this.pendingReply = new ConcurrentMultimap<Short, Message, Deque<Message>>(new ArrayDeque(0)) { // from class: co.paralleluniverse.galaxy.jgroups.JGroupsComm.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // co.paralleluniverse.common.collection.ConcurrentMapComplex
            public Deque<Message> allocateElement() {
                return new ConcurrentLinkedDeque();
            }
        };
        this.channel = getCluster().getDataChannel();
        this.serverComm = comm;
        this.channel.setReceiver(new ReceiverAdapter() { // from class: co.paralleluniverse.galaxy.jgroups.JGroupsComm.2
            public void receive(org.jgroups.Message message) {
                JGroupsComm.this.receive(message);
            }
        });
        this.sendToServerInsteadOfMulticast = comm != null;
    }

    @Override // co.paralleluniverse.galaxy.core.AbstractComm, co.paralleluniverse.galaxy.core.Comm
    public void setReceiver(MessageReceiver messageReceiver) {
        super.setReceiver(messageReceiver);
        if (this.serverComm != null) {
            this.serverComm.setReceiver(messageReceiver);
        }
    }

    @Override // co.paralleluniverse.common.spring.Service, co.paralleluniverse.common.spring.Component
    public void init() throws Exception {
        super.init();
        if (this.sendToServerInsteadOfMulticast && this.serverComm == null) {
            throw new RuntimeException("sendToServerInsteadOfBroadcast is set to true but no serverComm set");
        }
    }

    @Override // co.paralleluniverse.galaxy.core.ClusterService, co.paralleluniverse.common.spring.Service, co.paralleluniverse.common.spring.Component
    public void postInit() throws Exception {
        if (!this.sendToServerInsteadOfMulticast) {
            this.pendingBroadcasts = new ConcurrentHashMap();
        }
        super.postInit();
    }

    @Override // co.paralleluniverse.galaxy.core.ClusterService
    protected void start(boolean z) {
        final long convert = TimeUnit.NANOSECONDS.convert(getTimeout(), TimeUnit.MILLISECONDS);
        getScheduler().scheduleAtFixedRate(new Runnable() { // from class: co.paralleluniverse.galaxy.jgroups.JGroupsComm.3
            @Override // java.lang.Runnable
            public void run() {
                long nanoTime = System.nanoTime();
                if (JGroupsComm.this.pendingBroadcasts != null) {
                    Iterator it = JGroupsComm.this.pendingBroadcasts.values().iterator();
                    while (it.hasNext()) {
                        Message.LineMessage lineMessage = ((BroadcastEntry) it.next()).message;
                        if (lineMessage.getType() != Message.Type.INVACK && nanoTime - lineMessage.getTimestamp() > convert && JGroupsComm.this.pendingBroadcasts.remove(Long.valueOf(lineMessage.getMessageId())) != null) {
                            JGroupsComm.LOG.debug("Timeout on message {}", lineMessage);
                            JGroupsComm.this.receive(Message.TIMEOUT(lineMessage).setIncoming());
                        }
                    }
                }
                for (Deque deque : JGroupsComm.this.pendingReply.values()) {
                    for (Message message : Util.reverse(deque)) {
                        if (message.getType() != Message.Type.INVACK && nanoTime - message.getTimestamp() > convert) {
                            if (deque.removeLastOccurrence(message)) {
                                JGroupsComm.LOG.debug("Timeout on message {}", message);
                                JGroupsComm.this.receive(Message.TIMEOUT((Message.LineMessage) message).setIncoming());
                            }
                        }
                    }
                }
                if (JGroupsComm.this.hasPendingBroadcasts()) {
                    try {
                        JGroupsComm.this.channel.send(new org.jgroups.Message((Address) null, new byte[0]));
                    } catch (Exception e) {
                        JGroupsComm.LOG.error("Error while broadcasting flush.", e);
                    }
                }
            }
        }, 0L, getTimeout() / 2, TimeUnit.MILLISECONDS);
        setReady(true);
    }

    @Override // co.paralleluniverse.galaxy.core.ClusterService
    public final JGroupsCluster getCluster() {
        return (JGroupsCluster) super.getCluster();
    }

    protected boolean hasPendingBroadcasts() {
        return !this.pendingBroadcasts.isEmpty();
    }

    protected boolean addToPending(Message message, short s) {
        if (!message.getType().isOf(Message.Type.REQUIRES_RESPONSE)) {
            LOG.debug("Message {} does not require a response.", message);
            return true;
        }
        if (s >= 0) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Enqueing message in pending-replies {}", message);
            }
            ((Deque) this.pendingReply.getOrAllocate(Short.valueOf(s))).addFirst(message);
            return true;
        }
        if (!$assertionsDisabled && !message.isBroadcast()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !(message instanceof Message.LineMessage)) {
            throw new AssertionError();
        }
        Set<Short> nodes = getCluster().getNodes();
        if (!(message instanceof Message.LineMessage)) {
            return true;
        }
        if (!nodes.isEmpty() && (nodes.size() != 1 || !nodes.contains((short) 0))) {
            this.pendingBroadcasts.put(Long.valueOf(message.getMessageId()), new BroadcastEntry((Message.LineMessage) message, nodes));
            return true;
        }
        LOG.debug("No other nodes in cluster. Responding with NOT_FOUND to message {}", message);
        receive(Message.NOT_FOUND((Message.LineMessage) message).setIncoming());
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // co.paralleluniverse.galaxy.core.AbstractComm
    public void sendToNode(Message message, short s, Address address) {
        assignMessageId(message);
        addToPending(message, s);
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending to node {} ({}): {}", new Object[]{Short.valueOf(s), address, message});
            }
            this.channel.send(new org.jgroups.Message(address, message.toByteArray()));
        } catch (Exception e) {
            LOG.error("Error while sending message " + message + " to node " + ((int) s), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // co.paralleluniverse.galaxy.core.AbstractComm
    public void sendToServer(Message message) {
        super.sendToServer(message);
        try {
            this.serverComm.send(message);
        } catch (NodeNotFoundException e) {
            throw new RuntimeException("Server not found!", e);
        }
    }

    @Override // co.paralleluniverse.galaxy.core.AbstractComm
    protected void broadcast(Message message) {
        assignMessageId(message);
        if (addToPending(message, (short) -1)) {
            broadcast(message);
        }
        try {
            LOG.debug("Broadcasting (null): {}", message);
            this.channel.send(new org.jgroups.Message((Address) null, message.toByteArray()));
        } catch (Exception e) {
            LOG.error("Error while broadcasting message " + message, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void receive(org.jgroups.Message message) {
        Deque deque;
        try {
            LOG.debug("Received {}", message);
            if (getCluster().getMyAddress() == null || message.getSrc() == null || !getCluster().getMyAddress().equals(message.getSrc())) {
                byte[] rawBuffer = message.getRawBuffer();
                if (rawBuffer.length == 0) {
                    return;
                }
                Message fromByteArray = Message.fromByteArray(rawBuffer);
                Address src = message.getSrc();
                if (fromByteArray.isResponse() && (deque = (Deque) this.pendingReply.get(Short.valueOf(fromByteArray.getNode()))) != null && deque.removeLastOccurrence(fromByteArray)) {
                    LOG.debug("Message {} is a reply! (removing from pending)", fromByteArray);
                }
                if (fromByteArray.isResponse() && this.pendingBroadcasts.get(Long.valueOf(fromByteArray.getMessageId())) != null) {
                    if (fromByteArray.getType() != Message.Type.ACK) {
                        LOG.debug("Message {} is a reply to a broadcast! (discarding pending)", fromByteArray);
                        this.pendingBroadcasts.remove(Long.valueOf(fromByteArray.getMessageId()));
                    } else {
                        removeFromPendingBroadcasts(fromByteArray.getMessageId(), fromByteArray.getNode());
                    }
                }
                short node = getNode(src);
                if (node < 0) {
                    throw new RuntimeException("Node not found for source address " + src);
                }
                fromByteArray.setNode(node);
                receive(fromByteArray);
            }
        } catch (Exception e) {
            LOG.error("Error receiving message", e);
        }
    }

    @Override // co.paralleluniverse.galaxy.core.AbstractComm, co.paralleluniverse.galaxy.cluster.NodeChangeListener
    public void nodeAdded(short s) {
        super.nodeAdded(s);
        try {
            Iterator it = Util.reverse((Deque) this.pendingReply.get(Short.valueOf(s))).iterator();
            while (it.hasNext()) {
                sendToNode((Message) it.next(), s);
            }
        } catch (NodeNotFoundException e) {
            throw new AssertionError();
        }
    }

    @Override // co.paralleluniverse.galaxy.core.AbstractComm, co.paralleluniverse.galaxy.cluster.NodeChangeListener
    public void nodeSwitched(short s) {
        super.nodeSwitched(s);
        try {
            Iterator it = Util.reverse((Deque) this.pendingReply.get(Short.valueOf(s))).iterator();
            while (it.hasNext()) {
                sendToNode((Message) it.next(), s);
            }
            Iterator<BroadcastEntry> it2 = this.pendingBroadcasts.values().iterator();
            while (it2.hasNext()) {
                sendToNode(it2.next().message, s);
            }
        } catch (NodeNotFoundException e) {
            throw new AssertionError();
        }
    }

    @Override // co.paralleluniverse.galaxy.core.AbstractComm, co.paralleluniverse.galaxy.cluster.NodeChangeListener
    public void nodeRemoved(short s) {
        super.nodeRemoved(s);
        this.pendingReply.remove(Short.valueOf(s));
        Iterator<Long> it = this.pendingBroadcasts.keySet().iterator();
        while (it.hasNext()) {
            removeFromPendingBroadcasts(it.next().longValue(), s);
        }
    }

    private void removeFromPendingBroadcasts(long j, short s) {
        BroadcastEntry broadcastEntry = this.pendingBroadcasts.get(Long.valueOf(j));
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got ACK from {} to message {}", Short.valueOf(s), broadcastEntry.message);
        }
        if (broadcastEntry.removeNode(s)) {
            LOG.debug("Got all ACKs for message {}, but no response - sending NOT_FOUND to cache!", broadcastEntry.message);
            receive(Message.NOT_FOUND(broadcastEntry.message).setIncoming());
            this.pendingBroadcasts.remove(Long.valueOf(j));
        }
    }

    static {
        $assertionsDisabled = !JGroupsComm.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(JGroupsComm.class);
    }
}
