package co.paralleluniverse.galaxy.netty;

import co.paralleluniverse.galaxy.Cluster;
import co.paralleluniverse.galaxy.cluster.NodeInfo;
import co.paralleluniverse.galaxy.cluster.ReaderWriters;
import co.paralleluniverse.galaxy.cluster.SlaveConfigurationListener;
import co.paralleluniverse.galaxy.core.Backup;
import co.paralleluniverse.galaxy.core.Cache;
import co.paralleluniverse.galaxy.core.Message;
import co.paralleluniverse.galaxy.core.SlaveComm;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Maps;
import java.beans.ConstructorProperties;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ServerChannel;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/paralleluniverse/galaxy/netty/TcpSlaveServerComm.class */
public final class TcpSlaveServerComm extends AbstractTcpServer implements SlaveComm {
    private static final Logger LOG = LoggerFactory.getLogger(TcpSlaveServerComm.class);
    private Backup backup;
    private boolean sentSlave;
    private final ConcurrentMap<Channel, Iterator<Message.BACKUP>> replIters;
    private long lastId;
    private volatile Thread replThread;

    /* renamed from: co.paralleluniverse.galaxy.netty.TcpSlaveServerComm$4, reason: invalid class name */
    /* loaded from: input_file:co/paralleluniverse/galaxy/netty/TcpSlaveServerComm$4.class */
    static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$co$paralleluniverse$galaxy$core$Message$Type = new int[Message.Type.values().length];

        static {
            try {
                $SwitchMap$co$paralleluniverse$galaxy$core$Message$Type[Message.Type.BACKUP_PACKETACK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$co$paralleluniverse$galaxy$core$Message$Type[Message.Type.INVACK.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$co$paralleluniverse$galaxy$core$Message$Type[Message.Type.INV.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$co$paralleluniverse$galaxy$core$Message$Type[Message.Type.BACKUP_PACKET.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/paralleluniverse/galaxy/netty/TcpSlaveServerComm$ChannelGroup.class */
    public static class ChannelGroup extends DefaultChannelGroup {
        private final BiMap<NodeInfo, Channel> channels;

        public ChannelGroup(String str) {
            super(str);
            this.channels = Maps.synchronizedBiMap(HashBiMap.create());
        }

        public ChannelGroup() {
            this.channels = Maps.synchronizedBiMap(HashBiMap.create());
        }

        public boolean add(Channel channel) {
            if (channel instanceof ServerChannel) {
                return super.add(channel);
            }
            NodeInfo nodeInfo = TcpSlaveServerComm.getNodeInfo(channel);
            if (nodeInfo == null) {
                TcpSlaveServerComm.LOG.warn("Received connection from an unknown address {}.", channel.getRemoteAddress());
                throw new RuntimeException("Unknown node for address " + channel.getRemoteAddress());
            }
            boolean add = super.add(channel);
            if (add) {
                this.channels.put(nodeInfo, channel);
            }
            return add;
        }

        public boolean remove(Object obj) {
            Channel channel = (Channel) obj;
            boolean remove = super.remove(obj);
            if (remove) {
                this.channels.inverse().remove(channel);
            }
            ChannelNodeInfo.nodeInfo.remove(channel);
            return remove;
        }

        public void clear() {
            super.clear();
            this.channels.clear();
        }

        public boolean contains(Object obj) {
            return obj instanceof NodeInfo ? this.channels.containsKey((NodeInfo) obj) : super.contains(obj);
        }

        public Channel get(NodeInfo nodeInfo) {
            return (Channel) this.channels.get(nodeInfo);
        }

        public NodeInfo get(Channel channel) {
            return (NodeInfo) this.channels.inverse().get(channel);
        }
    }

    @ConstructorProperties({"name", "cluster", "port"})
    public TcpSlaveServerComm(String str, Cluster cluster, int i) throws Exception {
        this(str, cluster, i, null);
    }

    TcpSlaveServerComm(String str, Cluster cluster, int i, ChannelHandler channelHandler) throws Exception {
        super(str, cluster, new ChannelGroup(), i, channelHandler);
        this.replIters = new ConcurrentHashMap();
        cluster.addNodeProperty(IpConstants.IP_ADDRESS, true, true, IpConstants.INET_ADDRESS_READER_WRITER);
        cluster.setNodeProperty(IpConstants.IP_ADDRESS, InetAddress.getLocalHost());
        cluster.addNodeProperty(IpConstants.IP_SLAVE_PORT, true, false, ReaderWriters.INTEGER);
        cluster.setNodeProperty(IpConstants.IP_SLAVE_PORT, Integer.valueOf(i));
        cluster.addSlaveConfigurationListener(new SlaveConfigurationListener() { // from class: co.paralleluniverse.galaxy.netty.TcpSlaveServerComm.1
            @Override // co.paralleluniverse.galaxy.cluster.SlaveConfigurationListener
            public void newMaster(NodeInfo nodeInfo) {
            }

            @Override // co.paralleluniverse.galaxy.cluster.SlaveConfigurationListener
            public void slaveAdded(NodeInfo nodeInfo) {
            }

            @Override // co.paralleluniverse.galaxy.cluster.SlaveConfigurationListener
            public void slaveRemoved(NodeInfo nodeInfo) {
                Channel channel = TcpSlaveServerComm.this.getChannels().get(nodeInfo);
                if (channel != null) {
                    TcpSlaveServerComm.LOG.info("Closing channel for removed node {}", nodeInfo);
                    channel.close();
                }
            }
        });
    }

    @Override // co.paralleluniverse.galaxy.core.SlaveComm
    public void setBackup(Backup backup) {
        assertDuringInitialization();
        this.backup = backup;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // co.paralleluniverse.galaxy.core.ClusterService, co.paralleluniverse.common.spring.Service, co.paralleluniverse.common.spring.Component
    public void postInit() throws Exception {
        super.postInit();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // co.paralleluniverse.common.spring.Service, co.paralleluniverse.common.spring.Component
    public void init() throws Exception {
        super.init();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // co.paralleluniverse.common.spring.Service
    public void available(boolean z) {
        super.available(z);
    }

    @Override // co.paralleluniverse.galaxy.core.ClusterService
    protected void start(boolean z) {
        if (z) {
            bind();
            startReplicationThread();
        }
    }

    @Override // co.paralleluniverse.galaxy.core.ClusterService, co.paralleluniverse.galaxy.cluster.LifecycleListener
    public void switchToMaster() {
        super.switchToMaster();
        bind();
        startReplicationThread();
    }

    @Override // co.paralleluniverse.galaxy.netty.AbstractTcpServer, co.paralleluniverse.common.spring.Component
    public void shutdown() {
        this.replThread.interrupt();
        super.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // co.paralleluniverse.galaxy.netty.AbstractTcpServer
    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = super.getPipeline();
        pipeline.addLast("connections", new SimpleChannelUpstreamHandler() { // from class: co.paralleluniverse.galaxy.netty.TcpSlaveServerComm.2
            public void channelConnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
                if (TcpSlaveServerComm.this.getChannels().size() > 2) {
                    throw new RuntimeException("Only one slave is currently supported! - " + new ArrayList((Collection) TcpSlaveServerComm.this.getChannels()));
                }
                InetAddress address = ((InetSocketAddress) channelHandlerContext.getChannel().getRemoteAddress()).getAddress();
                if (TcpSlaveServerComm.this.getCluster().getNodesByProperty(IpConstants.IP_ADDRESS, address).isEmpty()) {
                    TcpSlaveServerComm.LOG.warn("An attempt to connect from an unrecognized address {}. No registered cluster node has this address.", address);
                    channelHandlerContext.getChannel().close();
                    return;
                }
                TcpSlaveServerComm.this.replIters.put(channelHandlerContext.getChannel(), TcpSlaveServerComm.this.backup.iterOwned());
                synchronized (TcpSlaveServerComm.this.replIters) {
                    TcpSlaveServerComm.this.replIters.notify();
                }
                super.channelConnected(channelHandlerContext, channelStateEvent);
            }

            public void channelDisconnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
                TcpSlaveServerComm.this.ack(channelHandlerContext, null);
                TcpSlaveServerComm.this.replIters.remove(channelHandlerContext.getChannel());
                super.channelDisconnected(channelHandlerContext, channelStateEvent);
            }
        });
        return pipeline;
    }

    @Override // co.paralleluniverse.galaxy.netty.AbstractTcpServer
    protected void receive(ChannelHandlerContext channelHandlerContext, Message message) {
        switch (AnonymousClass4.$SwitchMap$co$paralleluniverse$galaxy$core$Message$Type[message.getType().ordinal()]) {
            case 1:
                ack(channelHandlerContext, (Message.BACKUP_PACKETACK) message);
                return;
            case Cache.CacheLine.MODIFIED /* 2 */:
                invack(channelHandlerContext, (Message.LineMessage) message);
                return;
            default:
                LOG.warn("Unhandled message: {}", message);
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void ack(ChannelHandlerContext channelHandlerContext, Message.BACKUP_PACKETACK backup_packetack) {
        synchronized (this) {
            if (backup_packetack != null) {
                if (backup_packetack.getId() != this.lastId) {
                    LOG.warn("Received backup ack id {} which is different from last sent: {}", Long.valueOf(backup_packetack.getId()), Long.valueOf(this.lastId));
                    return;
                }
            }
            LOG.debug("Received backup ack from slave {}", channelHandlerContext.getChannel());
            this.sentSlave = false;
            this.backup.slavesAck(this.lastId);
        }
    }

    private void invack(ChannelHandlerContext channelHandlerContext, Message.LineMessage lineMessage) {
        this.backup.slavesInvAck(lineMessage.getLine());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static NodeInfo getNodeInfo(Channel channel) {
        return (NodeInfo) ChannelNodeInfo.nodeInfo.get(channel);
    }

    @Override // co.paralleluniverse.galaxy.core.SlaveComm
    public synchronized boolean send(Message message) {
        if (message.getType() == Message.Type.BACKUP_PACKET && this.sentSlave) {
            throw new RuntimeException("Previous backup not handled yet!");
        }
        if (!message.isResponse()) {
            message.setMessageId(nextMessageId());
        }
        LOG.debug("Send {}", message);
        HashSet hashSet = new HashSet();
        Iterator it = getChannels().write(message).iterator();
        while (it.hasNext()) {
            hashSet.add(((ChannelFuture) it.next()).getChannel());
        }
        if (hashSet.isEmpty()) {
            LOG.debug("No slaves... Returning false");
            return false;
        }
        LOG.debug("Sending to slaves: {}", hashSet);
        if (hashSet.size() > 1) {
            throw new RuntimeException("Only one slave is currently supported! - " + hashSet);
        }
        switch (AnonymousClass4.$SwitchMap$co$paralleluniverse$galaxy$core$Message$Type[message.getType().ordinal()]) {
            case 3:
                return true;
            case Cache.CacheLine.SLAVE /* 4 */:
                this.lastId = ((Message.BACKUP_PACKET) message).getId();
                this.sentSlave = true;
                return true;
            default:
                LOG.warn("Unhandled message: {}", message);
                return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // co.paralleluniverse.galaxy.netty.AbstractTcpServer
    public ChannelGroup getChannels() {
        return (ChannelGroup) super.getChannels();
    }

    private void startReplicationThread() {
        if (this.replThread != null) {
            return;
        }
        this.replThread = new Thread(new Runnable() { // from class: co.paralleluniverse.galaxy.netty.TcpSlaveServerComm.3
            @Override // java.lang.Runnable
            public void run() {
                while (!Thread.interrupted()) {
                    try {
                        synchronized (TcpSlaveServerComm.this.replIters) {
                            while (TcpSlaveServerComm.this.replIters.isEmpty()) {
                                TcpSlaveServerComm.this.replIters.wait();
                            }
                        }
                        Iterator it = TcpSlaveServerComm.this.replIters.entrySet().iterator();
                        while (it.hasNext()) {
                            Map.Entry entry = (Map.Entry) it.next();
                            Channel channel = (Channel) entry.getKey();
                            Iterator it2 = (Iterator) entry.getValue();
                            int i = 0;
                            while (true) {
                                if (i < 10) {
                                    if (!it2.hasNext()) {
                                        channel.write(Message.BACKUP(-1L, -1L, null));
                                        TcpSlaveServerComm.LOG.debug("Finished replicating to channel {}", channel);
                                        it.remove();
                                        break;
                                    } else {
                                        Message.BACKUP backup = (Message.BACKUP) it2.next();
                                        TcpSlaveServerComm.LOG.debug("Replicating {} to channel {}", backup, channel);
                                        channel.write(backup);
                                        i++;
                                    }
                                } else {
                                    break;
                                }
                            }
                        }
                    } catch (InterruptedException e) {
                    }
                }
                TcpSlaveServerComm.LOG.info("Replication thread interrupted");
            }
        });
        this.replThread.setName("backup-replication");
        this.replThread.setDaemon(true);
        this.replThread.setPriority(4);
        this.replThread.start();
    }
}
