package co.paralleluniverse.galaxy.netty;

import co.paralleluniverse.common.collection.Util;
import co.paralleluniverse.common.monitoring.ThreadPoolExecutorMonitor;
import co.paralleluniverse.galaxy.Cluster;
import co.paralleluniverse.galaxy.cluster.NodeInfo;
import co.paralleluniverse.galaxy.core.ClusterService;
import co.paralleluniverse.galaxy.core.CommThread;
import co.paralleluniverse.galaxy.core.Message;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/paralleluniverse/galaxy/netty/AbstractTcpClient.class */
abstract class AbstractTcpClient extends ClusterService {
    private final Logger LOG;
    private String nodeName;
    private final String portProperty;
    private InetSocketAddress address;
    private final ChannelPipelineFactory origChannelFacotry;
    private final ChannelFactory channelFactory;
    private final ClientBootstrap bootstrap;
    private boolean connecting;
    private Channel channel;
    private volatile boolean reconnect;
    private final Lock channelLock;
    private final Condition channelConnected;
    private final Deque<Message> pendingReply;
    private final ExecutorService executor;
    private ThreadPoolExecutor bossExecutor;
    private ThreadPoolExecutor workerExecutor;
    private OrderedMemoryAwareThreadPoolExecutor receiveExecutor;
    private final ChannelHandler channelHandler;

    public AbstractTcpClient(String str, final Cluster cluster, String str2) throws Exception {
        super(str, cluster);
        this.LOG = LoggerFactory.getLogger(AbstractTcpClient.class.getName() + "." + getName());
        this.channelLock = new ReentrantLock();
        this.channelConnected = this.channelLock.newCondition();
        this.pendingReply = new ConcurrentLinkedDeque();
        this.executor = Executors.newSingleThreadExecutor();
        this.channelHandler = new SimpleChannelHandler() { // from class: co.paralleluniverse.galaxy.netty.AbstractTcpClient.5
            public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) {
                Message message = (Message) messageEvent.getMessage();
                AbstractTcpClient.this.LOG.debug("Received {}", message);
                AbstractTcpClient.this.pendingReply.removeLastOccurrence(message);
                AbstractTcpClient.this.receive(channelHandlerContext, message);
            }

            public void channelConnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
                AbstractTcpClient.this.channelLock.lock();
                try {
                    AbstractTcpClient.this.channel = channelStateEvent.getChannel();
                    if (!AbstractTcpClient.this.connecting) {
                        AbstractTcpClient.this.LOG.info("Asked to disconnect from newly connected channel {}. Closing.", AbstractTcpClient.this.channel);
                        AbstractTcpClient.this.channel.close();
                        AbstractTcpClient.this.channelLock.unlock();
                        return;
                    }
                    AbstractTcpClient.this.LOG.debug("Set channel to {}", AbstractTcpClient.this.channel);
                    for (Message message : Util.reverse(AbstractTcpClient.this.pendingReply)) {
                        AbstractTcpClient.this.LOG.debug("Sending pending message {} (channel connected)", message);
                        AbstractTcpClient.this.channel.write(message);
                        AbstractTcpClient.this.LOG.debug("Message {} written", message);
                    }
                    AbstractTcpClient.this.setReady(true);
                    AbstractTcpClient.this.connecting = false;
                    AbstractTcpClient.this.channelConnected.signalAll();
                    AbstractTcpClient.this.channelLock.unlock();
                    super.channelConnected(channelHandlerContext, channelStateEvent);
                } catch (Throwable th) {
                    AbstractTcpClient.this.channelLock.unlock();
                    throw th;
                }
            }

            public void channelDisconnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
                AbstractTcpClient.this.channelLock.lock();
                try {
                    if (channelHandlerContext.getChannel() == AbstractTcpClient.this.channel) {
                        AbstractTcpClient.this.setReady(false);
                        if (AbstractTcpClient.this.channel != null) {
                            AbstractTcpClient.this.channel.close();
                        }
                        AbstractTcpClient.this.channel = null;
                        AbstractTcpClient.this.connectLater();
                    }
                    super.channelDisconnected(channelHandlerContext, channelStateEvent);
                } finally {
                    AbstractTcpClient.this.channelLock.unlock();
                }
            }

            public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) {
                AbstractTcpClient.this.channelLock.lock();
                try {
                    AbstractTcpClient.this.LOG.info("Channel exception: {} {}", exceptionEvent.getCause().getClass().getName(), exceptionEvent.getCause().getMessage());
                    AbstractTcpClient.this.LOG.debug("Channel exception", exceptionEvent.getCause());
                    AbstractTcpClient.this.setReady(false);
                    if (AbstractTcpClient.this.channel != null) {
                        AbstractTcpClient.this.channel.close();
                    }
                    AbstractTcpClient.this.channel = null;
                    AbstractTcpClient.this.channelLock.unlock();
                    AbstractTcpClient.this.connectLater();
                } catch (Throwable th) {
                    AbstractTcpClient.this.channelLock.unlock();
                    AbstractTcpClient.this.connectLater();
                    throw th;
                }
            }
        };
        this.portProperty = str2;
        if (this.bossExecutor == null) {
            this.bossExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
        }
        if (this.workerExecutor == null) {
            this.workerExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
        }
        configureThreadPool(str + "-tcpClientBoss", this.bossExecutor);
        configureThreadPool(str + "-tcpClientWorker", this.workerExecutor);
        if (this.receiveExecutor != null) {
            configureThreadPool(str + "-tcpClientReceive", this.receiveExecutor);
        }
        this.channelFactory = new NioClientSocketChannelFactory(this.bossExecutor, this.workerExecutor);
        this.bootstrap = new ClientBootstrap(this.channelFactory);
        this.origChannelFacotry = new TcpMessagePipelineFactory(this.LOG, null, this.receiveExecutor) { // from class: co.paralleluniverse.galaxy.netty.AbstractTcpClient.1
            @Override // co.paralleluniverse.galaxy.netty.TcpMessagePipelineFactory
            public ChannelPipeline getPipeline() throws Exception {
                final ChannelPipeline pipeline = super.getPipeline();
                pipeline.addBefore("messageCodec", "nodeNameWriter", new ChannelNodeNameWriter(cluster));
                pipeline.addBefore("nodeNameWriter", "nodeInfoSetter", new SimpleChannelUpstreamHandler() { // from class: co.paralleluniverse.galaxy.netty.AbstractTcpClient.1.1
                    public void channelConnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
                        if (AbstractTcpClient.this.nodeName == null) {
                            throw new RuntimeException("nodeName not set!");
                        }
                        ChannelNodeInfo.nodeInfo.set(channelHandlerContext.getChannel(), cluster.getNodeInfoByName(AbstractTcpClient.this.nodeName));
                        super.channelConnected(channelHandlerContext, channelStateEvent);
                        pipeline.remove(this);
                    }
                });
                pipeline.addLast("router", AbstractTcpClient.this.channelHandler);
                return pipeline;
            }
        };
        this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() { // from class: co.paralleluniverse.galaxy.netty.AbstractTcpClient.2
            public ChannelPipeline getPipeline() throws Exception {
                return AbstractTcpClient.this.getPipeline();
            }
        });
        this.bootstrap.setOption("tcpNoDelay", true);
        this.bootstrap.setOption("keepAlive", true);
        this.reconnect = true;
    }

    @Override // co.paralleluniverse.common.spring.Component
    public void shutdown() {
        this.LOG.info("Shutting down.");
        disconnect();
        this.channelFactory.releaseExternalResources();
        this.executor.shutdownNow();
    }

    public void setBossExecutor(ThreadPoolExecutor threadPoolExecutor) {
        assertDuringInitialization();
        this.bossExecutor = threadPoolExecutor;
    }

    public void setWorkerExecutor(ThreadPoolExecutor threadPoolExecutor) {
        assertDuringInitialization();
        this.workerExecutor = threadPoolExecutor;
    }

    public void setReceiveExecutor(OrderedMemoryAwareThreadPoolExecutor orderedMemoryAwareThreadPoolExecutor) {
        assertDuringInitialization();
        this.receiveExecutor = orderedMemoryAwareThreadPoolExecutor;
    }

    private void configureThreadPool(String str, ThreadPoolExecutor threadPoolExecutor) {
        threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        threadPoolExecutor.setThreadFactory(new ThreadFactoryBuilder().setNameFormat(str + "-%d").setThreadFactory(new ThreadFactory() { // from class: co.paralleluniverse.galaxy.netty.AbstractTcpClient.3
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new CommThread(runnable);
            }
        }).build());
        ThreadPoolExecutorMonitor.register(str, threadPoolExecutor);
    }

    protected ChannelPipeline getPipeline() throws Exception {
        return this.origChannelFacotry.getPipeline();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setNodeName(String str) {
        this.nodeName = str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getNodeName() {
        return this.nodeName;
    }

    private InetSocketAddress getAddress(NodeInfo nodeInfo, String str) {
        InetAddress inetAddress = (InetAddress) nodeInfo.get(IpConstants.IP_ADDRESS);
        Integer num = (Integer) nodeInfo.get(str);
        if (inetAddress != null && num != null) {
            return new InetSocketAddress(inetAddress, num.intValue());
        }
        if (inetAddress == null) {
            this.LOG.warn("Socket address (property {}) not set for node {}", IpConstants.IP_ADDRESS, nodeInfo);
        }
        if (num != null) {
            return null;
        }
        this.LOG.warn("Socket port (property {}) not set for node {}", str, nodeInfo);
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void reconnect(String str) {
        if (str == null) {
            throw new IllegalArgumentException("nodeName cannot be null!");
        }
        this.channelLock.lock();
        try {
            if (!str.equals(this.nodeName)) {
                disconnect();
                setNodeName(str);
            }
            this.reconnect = true;
            connectLater();
        } finally {
            this.channelLock.unlock();
        }
    }

    private boolean isConnected() {
        this.channelLock.lock();
        try {
            return this.channel != null;
        } finally {
            this.channelLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void connectLater() {
        this.executor.submit(new Runnable() { // from class: co.paralleluniverse.galaxy.netty.AbstractTcpClient.4
            @Override // java.lang.Runnable
            public void run() {
                AbstractTcpClient.this.connect();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Code restructure failed: missing block: B:33:0x0017, code lost:
    
        r5.channelLock.unlock();
     */
    /* JADX WARN: Code restructure failed: missing block: B:34:0x001f, code lost:
    
        return;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void connect() {
        /*
            Method dump skipped, instructions count: 290
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: co.paralleluniverse.galaxy.netty.AbstractTcpClient.connect():void");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void disconnect() {
        this.LOG.info("Disconnecting from node {} - {}", this.nodeName, this.address);
        this.channelLock.lock();
        try {
            this.connecting = false;
            this.reconnect = false;
            if (this.channel != null) {
                this.LOG.debug("Closing channel {}", this.channel);
                this.channel.close().awaitUninterruptibly();
            }
            this.channel = null;
            this.channelLock.unlock();
        } catch (Throwable th) {
            this.channelLock.unlock();
            throw th;
        }
    }

    private Channel getChannel() {
        Channel channel = this.channel;
        if (channel != null) {
            return channel;
        }
        this.channelLock.lock();
        while (this.channel == null) {
            try {
                try {
                    this.channelConnected.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } catch (Throwable th) {
                this.channelLock.unlock();
                throw th;
            }
        }
        Channel channel2 = this.channel;
        this.channelLock.unlock();
        return channel2;
    }

    public void send(Message message) {
        this.LOG.debug("Send {}", message);
        if (message.getType().isOf(Message.Type.REQUIRES_RESPONSE)) {
            this.pendingReply.addFirst(message);
        } else {
            this.LOG.debug("Message {} does not require a response.", message);
        }
        this.channelLock.lock();
        try {
            if (this.channel != null) {
                this.channel.write(message);
                this.LOG.debug("Message {} written", message);
            } else {
                this.LOG.debug("Message {} not written b/c channel is not yet connected. Keeping as pending.", message);
            }
        } finally {
            this.channelLock.unlock();
        }
    }

    protected abstract void receive(ChannelHandlerContext channelHandlerContext, Message message);
}
