package co.paralleluniverse.galaxy.core;

import co.paralleluniverse.common.collection.ConcurrentMultimap;
import co.paralleluniverse.common.concurrent.WithExecutor;
import co.paralleluniverse.common.io.Streamable;
import co.paralleluniverse.common.io.Streamables;
import co.paralleluniverse.common.spring.Component;
import co.paralleluniverse.galaxy.MessageListener;
import co.paralleluniverse.galaxy.Messenger;
import co.paralleluniverse.galaxy.TimeoutException;
import co.paralleluniverse.galaxy.core.Message;
import co.paralleluniverse.galaxy.core.Op;
import com.google.common.util.concurrent.ListenableFuture;
import java.beans.ConstructorProperties;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.cliffc.high_scale_lib.NonBlockingHashMapLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/paralleluniverse/galaxy/core/MessengerImpl.class */
public class MessengerImpl extends Component implements Messenger {
    private static final Logger LOG = LoggerFactory.getLogger(MessengerImpl.class);
    private final AtomicLong topicGenerator;
    private final Cache cache;
    private final ConcurrentMultimap<Long, MessageListener, List<MessageListener>> longTopicListeners;
    private final ConcurrentMultimap<String, MessageListener, List<MessageListener>> stringTopicListeners;
    private final NodeOrderedThreadPoolExecutor executor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/paralleluniverse/galaxy/core/MessengerImpl$Msg.class */
    public static class Msg implements Streamable {
        private long lTopic;
        private String sTopic;
        private byte[] data;
        static final /* synthetic */ boolean $assertionsDisabled;

        public Msg() {
            this.lTopic = -1L;
            this.sTopic = null;
        }

        public Msg(long j, byte[] bArr) {
            this(j, (String) null, bArr);
        }

        public Msg(String str, byte[] bArr) {
            this(-1L, str, bArr);
            if (!$assertionsDisabled && str == null) {
                throw new AssertionError();
            }
        }

        public Msg(long j, Streamable streamable) {
            this(j, (String) null, streamable);
        }

        public Msg(String str, Streamable streamable) {
            this(-1L, str, streamable);
            if (!$assertionsDisabled && str == null) {
                throw new AssertionError();
            }
        }

        private Msg(long j, String str, byte[] bArr) {
            this.lTopic = -1L;
            this.sTopic = null;
            this.lTopic = j;
            this.sTopic = str;
            this.data = bArr;
        }

        private Msg(long j, String str, Streamable streamable) {
            this(j, str, Streamables.toByteArray(streamable));
        }

        public boolean hasSTopic() {
            return this.sTopic != null;
        }

        public long getlTopic() {
            return this.lTopic;
        }

        public String getsTopic() {
            return this.sTopic;
        }

        public byte[] getData() {
            return this.data;
        }

        @Override // co.paralleluniverse.common.io.Streamable
        public int size() {
            return 1 + (this.lTopic != -1 ? 8 : Streamables.calcUtfLength(this.sTopic) + 2 + this.data.length);
        }

        @Override // co.paralleluniverse.common.io.Streamable
        public void write(DataOutput dataOutput) throws IOException {
            boolean hasSTopic = hasSTopic();
            dataOutput.writeBoolean(hasSTopic);
            if (hasSTopic) {
                dataOutput.writeUTF(this.sTopic);
            } else {
                dataOutput.writeLong(this.lTopic);
            }
            dataOutput.writeShort((short) this.data.length);
            dataOutput.write(this.data);
        }

        @Override // co.paralleluniverse.common.io.Streamable
        public void read(DataInput dataInput) throws IOException {
            if (dataInput.readBoolean()) {
                this.lTopic = -1L;
                this.sTopic = dataInput.readUTF();
            } else {
                this.lTopic = dataInput.readLong();
                this.sTopic = null;
            }
            this.data = new byte[dataInput.readUnsignedShort()];
            dataInput.readFully(this.data);
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("Msg[");
            sb.append("Topic: ");
            if (this.sTopic != null) {
                sb.append('\"').append(this.sTopic).append('\"');
            } else {
                sb.append(this.lTopic);
            }
            sb.append(" data: ");
            if (this.data == null) {
                sb.append("null");
            } else {
                sb.append("(").append(this.data.length).append(" bytes)");
            }
            sb.append("]");
            return sb.toString();
        }

        static {
            $assertionsDisabled = !MessengerImpl.class.desiredAssertionStatus();
        }
    }

    @ConstructorProperties({"name", "cache", "threadPool"})
    MessengerImpl(String str, Cache cache, NodeOrderedThreadPoolExecutor nodeOrderedThreadPoolExecutor) {
        super(str);
        this.topicGenerator = new AtomicLong();
        this.longTopicListeners = new ConcurrentMultimap<Long, MessageListener, List<MessageListener>>(new NonBlockingHashMapLong(), Collections.EMPTY_LIST) { // from class: co.paralleluniverse.galaxy.core.MessengerImpl.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // co.paralleluniverse.common.collection.ConcurrentMapComplex
            public List<MessageListener> allocateElement() {
                return new CopyOnWriteArrayList();
            }
        };
        this.stringTopicListeners = new ConcurrentMultimap<String, MessageListener, List<MessageListener>>(new NonBlockingHashMap(), Collections.EMPTY_LIST) { // from class: co.paralleluniverse.galaxy.core.MessengerImpl.2
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // co.paralleluniverse.common.collection.ConcurrentMapComplex
            public List<MessageListener> allocateElement() {
                return new CopyOnWriteArrayList();
            }
        };
        this.executor = nodeOrderedThreadPoolExecutor;
        if (this.executor == null) {
            throw new RuntimeException("The executor must be set!");
        }
        this.cache = cache;
        cache.setReceiver(new MessageReceiver() { // from class: co.paralleluniverse.galaxy.core.MessengerImpl.3
            @Override // co.paralleluniverse.galaxy.core.MessageReceiver
            public void receive(Message message) {
                if (message.getType() == Message.Type.MSGACK) {
                    return;
                }
                MessengerImpl.this.receive((Message.MSG) message);
            }
        });
    }

    @Override // co.paralleluniverse.galaxy.Messenger
    public long createTopic() {
        return this.topicGenerator.incrementAndGet();
    }

    @Override // co.paralleluniverse.galaxy.Messenger
    public void addMessageListener(long j, MessageListener messageListener) {
        this.longTopicListeners.put((ConcurrentMultimap<Long, MessageListener, List<MessageListener>>) Long.valueOf(j), (Long) messageListener);
    }

    @Override // co.paralleluniverse.galaxy.Messenger
    public void removeMessageListener(long j, MessageListener messageListener) {
        this.longTopicListeners.remove(Long.valueOf(j), messageListener);
    }

    @Override // co.paralleluniverse.galaxy.Messenger
    public void addMessageListener(String str, MessageListener messageListener) {
        this.stringTopicListeners.put((ConcurrentMultimap<String, MessageListener, List<MessageListener>>) str, (String) messageListener);
    }

    @Override // co.paralleluniverse.galaxy.Messenger
    public void removeMessageListener(String str, MessageListener messageListener) {
        this.stringTopicListeners.remove(str, messageListener);
    }

    @Override // co.paralleluniverse.galaxy.Messenger
    public void send(short s, long j, byte[] bArr) {
        sendToNode(s, new Msg(j, (String) null, bArr));
    }

    @Override // co.paralleluniverse.galaxy.Messenger
    public void send(short s, String str, byte[] bArr) {
        if (str == null) {
            throw new IllegalArgumentException("Topic must not be null");
        }
        sendToNode(s, new Msg(-1L, str, bArr));
    }

    @Override // co.paralleluniverse.galaxy.Messenger
    public void send(short s, long j, Streamable streamable) {
        sendToNode(s, new Msg(j, (String) null, streamable));
    }

    @Override // co.paralleluniverse.galaxy.Messenger
    public void send(short s, String str, Streamable streamable) {
        if (str == null) {
            throw new IllegalArgumentException("Topic must not be null");
        }
        sendToNode(s, new Msg(-1L, str, streamable));
    }

    @Override // co.paralleluniverse.galaxy.Messenger
    public void sendToOwnerOf(long j, long j2, byte[] bArr) throws TimeoutException {
        sendToOwnerOf(j, new Msg(j2, (String) null, bArr));
    }

    @Override // co.paralleluniverse.galaxy.Messenger
    public void sendToOwnerOf(long j, String str, byte[] bArr) throws TimeoutException {
        if (str == null) {
            throw new IllegalArgumentException("Topic must not be null");
        }
        sendToOwnerOf(j, new Msg(-1L, str, bArr));
    }

    @Override // co.paralleluniverse.galaxy.Messenger
    public void sendToOwnerOf(long j, long j2, Streamable streamable) throws TimeoutException {
        sendToOwnerOf(j, new Msg(j2, (String) null, streamable));
    }

    @Override // co.paralleluniverse.galaxy.Messenger
    public void sendToOwnerOf(long j, String str, Streamable streamable) throws TimeoutException {
        if (str == null) {
            throw new IllegalArgumentException("Topic must not be null");
        }
        sendToOwnerOf(j, new Msg(-1L, str, streamable));
    }

    @Override // co.paralleluniverse.galaxy.Messenger
    public ListenableFuture<Void> sendToOwnerOfAsync(long j, long j2, byte[] bArr) {
        return sendToOwnerOfAsync(j, new Msg(j2, (String) null, bArr));
    }

    @Override // co.paralleluniverse.galaxy.Messenger
    public ListenableFuture<Void> sendToOwnerOfAsync(long j, String str, byte[] bArr) {
        if (str == null) {
            throw new IllegalArgumentException("Topic must not be null");
        }
        return sendToOwnerOfAsync(j, new Msg(-1L, str, bArr));
    }

    @Override // co.paralleluniverse.galaxy.Messenger
    public ListenableFuture<Void> sendToOwnerOfAsync(long j, long j2, Streamable streamable) {
        return sendToOwnerOfAsync(j, new Msg(j2, (String) null, streamable));
    }

    @Override // co.paralleluniverse.galaxy.Messenger
    public ListenableFuture<Void> sendToOwnerOfAsync(long j, String str, Streamable streamable) {
        if (str == null) {
            throw new IllegalArgumentException("Topic must not be null");
        }
        return sendToOwnerOfAsync(j, new Msg(-1L, str, streamable));
    }

    private void sendToNode(short s, Msg msg) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending to node {}: {}", Short.valueOf(s), msg);
        }
        this.cache.send(Message.MSG(s, -1L, Streamables.toByteArray(msg)));
    }

    private void sendToOwnerOf(long j, Msg msg) throws TimeoutException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending to owner of {}: {}", Long.toHexString(j), msg);
        }
        this.cache.doOp(Op.Type.SEND, j, null, Message.MSG((short) -1, j, Streamables.toByteArray(msg)), null);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private ListenableFuture<Void> sendToOwnerOfAsync(long j, Msg msg) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending to owner of {}: {}", Long.toHexString(j), msg);
        }
        return this.cache.doOpAsync(Op.Type.SEND, j, null, Message.MSG((short) -1, j, Streamables.toByteArray(msg)), null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void receive(Message.MSG msg) {
        Msg msg2 = new Msg();
        Streamables.fromByteArray(msg2, msg.getData());
        LOG.debug("Received: {}", msg2);
        List list = msg2.hasSTopic() ? (List) this.stringTopicListeners.get(msg2.getsTopic()) : (List) this.longTopicListeners.get(Long.valueOf(msg2.getlTopic()));
        if (list != null) {
            notifyListeners(list, msg.getNode(), msg2);
        }
    }

    private void notifyListeners(final Collection<MessageListener> collection, final short s, final Msg msg) {
        this.executor.execute(new NodeTask() { // from class: co.paralleluniverse.galaxy.core.MessengerImpl.4
            @Override // co.paralleluniverse.galaxy.core.NodeAttached
            public short getNode() {
                return s;
            }

            @Override // java.lang.Runnable
            public void run() {
                synchronized (collection) {
                    for (final MessageListener messageListener : collection) {
                        if (messageListener instanceof WithExecutor) {
                            ((WithExecutor) messageListener).getExecutor().execute(new Runnable() { // from class: co.paralleluniverse.galaxy.core.MessengerImpl.4.1
                                @Override // java.lang.Runnable
                                public void run() {
                                    try {
                                        messageListener.messageReceived(s, msg.getData());
                                    } catch (Exception e) {
                                        MessengerImpl.LOG.error("Listener threw an exception.", e);
                                    }
                                }
                            });
                        } else {
                            try {
                                messageListener.messageReceived(s, msg.getData());
                            } catch (Exception e) {
                                MessengerImpl.LOG.error("Listener threw an exception.", e);
                            }
                        }
                    }
                }
            }
        });
    }
}
