package co.paralleluniverse.remote.galaxy;

import co.paralleluniverse.concurrent.util.MapUtil;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.galaxy.MessageListener;
import co.paralleluniverse.galaxy.cluster.NodeChangeListener;
import co.paralleluniverse.galaxy.quasar.Grid;
import co.paralleluniverse.io.serialization.Serialization;
import co.paralleluniverse.remote.galaxy.GlxRemoteChannel;
import co.paralleluniverse.strands.channels.SendPort;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/paralleluniverse/remote/galaxy/RemoteChannelReceiver.class */
public class RemoteChannelReceiver<Message> implements MessageListener {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteChannelReceiver.class);
    private static final ConcurrentMap<SendPort<?>, RemoteChannelReceiver<?>> receivers = MapUtil.newConcurrentHashMap();
    private static final AtomicLong topicGen = new AtomicLong(1000);
    private final SendPort<Message> channel;
    private volatile MessageFilter<Message> filter;
    private final Map<Short, Integer> references = new ConcurrentHashMap();
    private final long topic = topicGen.incrementAndGet();

    /* loaded from: input_file:co/paralleluniverse/remote/galaxy/RemoteChannelReceiver$MessageFilter.class */
    public interface MessageFilter<Message> {
        boolean shouldForwardMessage(Message message);
    }

    public static <Message> RemoteChannelReceiver<Message> getReceiver(SendPort<Message> sendPort) {
        RemoteChannelReceiver<?> remoteChannelReceiver = receivers.get(sendPort);
        if (remoteChannelReceiver == null) {
            remoteChannelReceiver = new RemoteChannelReceiver<>(sendPort);
            RemoteChannelReceiver<?> putIfAbsent = receivers.putIfAbsent(sendPort, remoteChannelReceiver);
            if (putIfAbsent == null) {
                remoteChannelReceiver.subscribe();
            } else {
                remoteChannelReceiver = putIfAbsent;
            }
        }
        return (RemoteChannelReceiver<Message>) remoteChannelReceiver;
    }

    void shutdown() {
        unsubscribe();
        receivers.remove(this.channel);
    }

    private RemoteChannelReceiver(SendPort<Message> sendPort) {
        this.channel = sendPort;
        try {
            Grid.getInstance().cluster().addNodeChangeListener(new NodeChangeListener() { // from class: co.paralleluniverse.remote.galaxy.RemoteChannelReceiver.1
                public void nodeAdded(short s) {
                }

                public void nodeSwitched(short s) {
                }

                public void nodeRemoved(short s) {
                    RemoteChannelReceiver.LOG.debug("decrease RefCount for {} from node {}", this, Short.valueOf(s));
                    RemoteChannelReceiver.this.references.remove(Short.valueOf(s));
                    if (RemoteChannelReceiver.this.references.isEmpty()) {
                        RemoteChannelReceiver.LOG.debug("Shutting down receiver due to zero references" + this);
                        RemoteChannelReceiver.this.shutdown();
                    }
                }
            });
        } catch (InterruptedException e) {
            LOG.error(e.toString());
        }
    }

    public void setFilter(MessageFilter<Message> messageFilter) {
        this.filter = messageFilter;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void messageReceived(short s, byte[] bArr) {
        Object read = Serialization.getInstance().read(bArr);
        LOG.debug("Received: " + read);
        if (read instanceof GlxRemoteChannel.CloseMessage) {
            Throwable exception = ((GlxRemoteChannel.CloseMessage) read).getException();
            if (exception != null) {
                this.channel.close(exception);
            } else {
                this.channel.close();
            }
            unsubscribe();
            return;
        }
        if (read instanceof GlxRemoteChannel.RefMessage) {
            handleRefMessage((GlxRemoteChannel.RefMessage) read);
            return;
        }
        if (this.filter == null || this.filter.shouldForwardMessage(read)) {
            try {
                this.channel.send(read);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (SuspendExecution e2) {
                throw new AssertionError(e2);
            }
        }
    }

    private void subscribe() {
        GlxRemoteChannel.getMessenger().addMessageListener(Long.valueOf(this.topic).longValue(), this);
    }

    private void unsubscribe() {
        GlxRemoteChannel.getMessenger().removeMessageListener(this.topic, this);
    }

    public long getTopic() {
        return this.topic;
    }

    void handleRefMessage(GlxRemoteChannel.RefMessage refMessage) throws RuntimeException {
        LOG.debug("handling: " + refMessage);
        if (refMessage.isAdd()) {
            Integer num = this.references.get(Short.valueOf(refMessage.getNodeId()));
            if (num == null) {
                this.references.put(Short.valueOf(refMessage.getNodeId()), 1);
                return;
            } else {
                this.references.put(Short.valueOf(refMessage.getNodeId()), Integer.valueOf(num.intValue() + 1));
                return;
            }
        }
        Integer num2 = this.references.get(Short.valueOf(refMessage.getNodeId()));
        if (num2 == null) {
            throw new RuntimeException("decrease reference counter message received for unknown cluster node");
        }
        Integer valueOf = Integer.valueOf(num2.intValue() - 1);
        if (valueOf.intValue() > 0) {
            this.references.put(Short.valueOf(refMessage.getNodeId()), valueOf);
            return;
        }
        this.references.remove(Short.valueOf(refMessage.getNodeId()));
        if (this.references.isEmpty()) {
            shutdown();
        }
    }
}
