package org.activemq.broker.impl;

import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import EDU.oswego.cs.dl.util.concurrent.ThreadedExecutor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.security.auth.Subject;
import javax.transaction.xa.XAException;
import org.activemq.broker.BrokerAdmin;
import org.activemq.broker.BrokerClient;
import org.activemq.broker.BrokerConnector;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.BrokerAdminCommand;
import org.activemq.message.BrokerInfo;
import org.activemq.message.CapacityInfo;
import org.activemq.message.CleanupConnectionInfo;
import org.activemq.message.ConnectionInfo;
import org.activemq.message.ConsumerInfo;
import org.activemq.message.DurableUnsubscribe;
import org.activemq.message.IntResponseReceipt;
import org.activemq.message.KeepAlive;
import org.activemq.message.MessageAck;
import org.activemq.message.Packet;
import org.activemq.message.PacketListener;
import org.activemq.message.ProducerInfo;
import org.activemq.message.Receipt;
import org.activemq.message.ResponseReceipt;
import org.activemq.message.SessionInfo;
import org.activemq.message.TransactionInfo;
import org.activemq.message.XATransactionInfo;
import org.activemq.message.util.SpooledBoundedActiveMQMessageQueue;
import org.activemq.transport.NetworkChannel;
import org.activemq.transport.NetworkConnector;
import org.activemq.transport.TransportChannel;
import org.activemq.util.IdGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/activemq/broker/impl/BrokerClientImpl.class */
public class BrokerClientImpl implements BrokerClient, ExceptionListener, PacketListener {
    private static final Log log;
    private BrokerConnector brokerConnector;
    private TransportChannel channel;
    private ConnectionInfo connectionInfo;
    private boolean brokerConnection;
    private boolean clusteredConnection;
    private String remoteBrokerName;
    private SpooledBoundedActiveMQMessageQueue spoolQueue;
    private boolean cleanedUp;
    private boolean registered;
    private Subject subject;
    private boolean remoteNetworkConnector;
    static Class class$org$activemq$broker$impl$BrokerClientImpl;
    private int capacity = 100;
    private ArrayList dispatchQueue = new ArrayList();
    private IdGenerator packetIdGenerator = new IdGenerator();
    private SynchronizedBoolean closed = new SynchronizedBoolean(false);
    private SynchronizedBoolean started = new SynchronizedBoolean(false);
    private Set activeConsumers = new HashSet();
    private CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
    private CopyOnWriteArrayList producers = new CopyOnWriteArrayList();
    private CopyOnWriteArrayList transactions = new CopyOnWriteArrayList();
    private CopyOnWriteArrayList sessions = new CopyOnWriteArrayList();

    @Override // org.activemq.broker.BrokerClient
    public void initialize(BrokerConnector brokerConnector, TransportChannel transportChannel) {
        this.brokerConnector = brokerConnector;
        this.channel = transportChannel;
        this.channel.setPacketListener(this);
        this.channel.setExceptionListener(this);
        log.trace("brokerConnectorConnector client initialized");
    }

    @Override // org.activemq.broker.BrokerClient
    public BrokerConnector getBrokerConnector() {
        return this.brokerConnector;
    }

    public void onException(JMSException jMSException) {
        log.info(new StringBuffer().append("Client disconnected: ").append(this).toString());
        log.debug("Disconnect cuase: ", jMSException);
        close();
    }

    public String toString() {
        return new StringBuffer().append(new StringBuffer().append(new StringBuffer().append("brokerConnector-client:(").append(hashCode()).append(") ").toString()).append(this.connectionInfo == null ? "" : this.connectionInfo.getClientId()).toString()).append(": ").append(this.channel).toString();
    }

    @Override // org.activemq.broker.BrokerClient
    public void dispatch(ActiveMQMessage activeMQMessage) {
        if (!isSlowConsumer()) {
            dispatchToClient(activeMQMessage);
            return;
        }
        if (this.spoolQueue == null) {
            log.warn(new StringBuffer().append("Connection: ").append(this.connectionInfo.getClientId()).append(" is a slow consumer").toString());
            try {
                this.spoolQueue = new SpooledBoundedActiveMQMessageQueue(this.brokerConnector.getBrokerContainer().getBroker().getTempDir(), new StringBuffer().append(this.brokerConnector.getBrokerInfo().getBrokerName()).append("_").append(this.connectionInfo.getClientId()).toString());
                new ThreadedExecutor().execute(new Runnable(this, this.spoolQueue) { // from class: org.activemq.broker.impl.BrokerClientImpl.1
                    private final SpooledBoundedActiveMQMessageQueue val$bpq;
                    private final BrokerClientImpl this$0;

                    {
                        this.this$0 = this;
                        this.val$bpq = r5;
                    }

                    @Override // java.lang.Runnable
                    public void run() {
                        while (!this.this$0.closed.get()) {
                            try {
                                ActiveMQMessage dequeue = this.val$bpq.dequeue();
                                if (dequeue != null) {
                                    this.this$0.dispatchToClient(dequeue);
                                }
                            } catch (InterruptedException e) {
                                BrokerClientImpl.log.warn("async dispatch got an interupt", e);
                            } catch (JMSException e2) {
                                BrokerClientImpl.log.error("async dispatch got an problem", e2);
                            }
                        }
                    }
                });
            } catch (IOException e) {
                log.error("Could not create SpooledBoundedQueue for this slow consumer", e);
                close();
            } catch (InterruptedException e2) {
                log.error("Could not create SpooledBoundedQueue for this slow consumer", e2);
                close();
            }
        }
        if (this.spoolQueue != null) {
            try {
                this.spoolQueue.enqueue(activeMQMessage);
            } catch (JMSException e3) {
                log.error(new StringBuffer().append("Could not enqueue message ").append(activeMQMessage).append(" to SpooledBoundedQueue for this slow consumer").toString(), e3);
                close();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void dispatchToClient(Packet packet) {
        if (this.started.get()) {
            send(packet);
            return;
        }
        boolean z = false;
        if (packet.isJMSMessage() && ((ActiveMQMessage) packet).getJMSActiveMQDestination().isAdvisory()) {
            send(packet);
            z = true;
        }
        if (z) {
            return;
        }
        synchronized (this.started) {
            this.dispatchQueue.add(packet);
        }
    }

    @Override // org.activemq.broker.BrokerClient
    public boolean isBrokerConnection() {
        return this.brokerConnection;
    }

    @Override // org.activemq.broker.BrokerClient
    public boolean isClusteredConnection() {
        return this.clusteredConnection;
    }

    @Override // org.activemq.broker.BrokerClient
    public int getCapacity() {
        return this.capacity;
    }

    @Override // org.activemq.broker.BrokerClient
    public String getClientID() {
        if (this.connectionInfo != null) {
            return this.connectionInfo.getClientId();
        }
        return null;
    }

    @Override // org.activemq.broker.BrokerClient
    public TransportChannel getChannel() {
        return this.channel;
    }

    @Override // org.activemq.broker.BrokerClient
    public boolean isSlowConsumer() {
        return this.capacity <= 20;
    }

    @Override // org.activemq.message.PacketListener
    public void consume(Packet packet) {
        if (packet != null) {
            Throwable th = null;
            boolean z = false;
            boolean isReceiptRequired = packet.isReceiptRequired();
            short id = packet.getId();
            String brokerName = this.brokerConnector.getBrokerInfo().getBrokerName();
            String clusterName = this.brokerConnector.getBrokerInfo().getClusterName();
            try {
                if (this.brokerConnection) {
                    packet.addBrokerVisited(this.remoteBrokerName);
                    packet.addBrokerVisited(brokerName);
                }
                if (!packet.isJMSMessage()) {
                    switch (packet.getPacketType()) {
                        case 15:
                            consumeMessageAck((MessageAck) packet);
                            break;
                        case 16:
                        case 25:
                        case Packet.INT_RESPONSE_RECEIPT_INFO /* 26 */:
                        case Packet.WIRE_FORMAT_INFO /* 29 */:
                        case Packet.CACHED_VALUE_COMMAND /* 32 */:
                        default:
                            log.warn(new StringBuffer().append("Unknown Packet received: ").append(packet).toString());
                            break;
                        case 17:
                            consumeConsumerInfo((ConsumerInfo) packet);
                            break;
                        case 18:
                            consumeProducerInfo((ProducerInfo) packet);
                            break;
                        case 19:
                            consumeTransactionInfo((TransactionInfo) packet);
                            break;
                        case 20:
                            consumeXATransactionInfo((XATransactionInfo) packet);
                            break;
                        case 21:
                            consumeBrokerInfo((BrokerInfo) packet);
                            break;
                        case 22:
                            consumeConnectionInfo((ConnectionInfo) packet);
                            break;
                        case 23:
                            consumeSessionInfo((SessionInfo) packet);
                            break;
                        case 24:
                            this.brokerConnector.durableUnsubscribe(this, (DurableUnsubscribe) packet);
                            break;
                        case Packet.CAPACITY_INFO /* 27 */:
                            consumeCapacityInfo((CapacityInfo) packet);
                            break;
                        case Packet.CAPACITY_INFO_REQUEST /* 28 */:
                            updateCapacityInfo(packet.getId());
                            break;
                        case Packet.KEEP_ALIVE /* 30 */:
                            break;
                        case Packet.BROKER_ADMIN_COMMAND /* 31 */:
                            consumeBrokerAdminCommand((BrokerAdminCommand) packet);
                            break;
                        case Packet.CLEANUP_CONNECTION_INFO /* 33 */:
                            consumeCleanupConnectionInfo((CleanupConnectionInfo) packet);
                            break;
                    }
                } else {
                    ActiveMQMessage activeMQMessage = (ActiveMQMessage) packet;
                    if (!this.brokerConnection) {
                        activeMQMessage.setEntryBrokerName(brokerName);
                        activeMQMessage.setEntryClusterName(clusterName);
                    }
                    consumeActiveMQMessage(activeMQMessage);
                }
            } catch (Throwable th2) {
                th = th2;
                log.warn(new StringBuffer().append("caught exception consuming packet: ").append(packet).toString(), th2);
                z = true;
            }
            if (isReceiptRequired) {
                sendReceipt(id, th, z);
            }
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:33:0x00dc, code lost:
    
        if (org.activemq.broker.impl.BrokerClientImpl.log.isDebugEnabled() == false) goto L27;
     */
    /* JADX WARN: Code restructure failed: missing block: B:34:0x00df, code lost:
    
        org.activemq.broker.impl.BrokerClientImpl.log.debug(new java.lang.StringBuffer().append(r4).append(" has stopped").toString());
     */
    /* JADX WARN: Code restructure failed: missing block: B:35:0x00fa, code lost:
    
        r4.consumers.clear();
        r4.producers.clear();
        r4.transactions.clear();
        r4.sessions.clear();
     */
    /* JADX WARN: Code restructure failed: missing block: B:37:0x00d1, code lost:
    
        throw r8;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void consumeCleanupConnectionInfo(org.activemq.message.CleanupConnectionInfo r5) throws javax.jms.JMSException {
        /*
            Method dump skipped, instructions count: 281
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.activemq.broker.impl.BrokerClientImpl.consumeCleanupConnectionInfo(org.activemq.message.CleanupConnectionInfo):void");
    }

    private void consumeBrokerAdminCommand(BrokerAdminCommand brokerAdminCommand) throws JMSException {
        BrokerAdmin brokerAdmin = this.brokerConnector.getBrokerContainer().getBroker().getBrokerAdmin();
        if (BrokerAdminCommand.CREATE_DESTINATION.equals(brokerAdminCommand.getCommand())) {
            brokerAdmin.createMessageContainer(brokerAdminCommand.getDestination());
        } else if (BrokerAdminCommand.DESTROY_DESTINATION.equals(brokerAdminCommand.getCommand())) {
            brokerAdmin.destoryMessageContainer(brokerAdminCommand.getDestination());
        } else {
            if (!BrokerAdminCommand.EMPTY_DESTINATION.equals(brokerAdminCommand.getCommand())) {
                throw new JMSException(new StringBuffer().append("Broker Admin Command type: ").append(brokerAdminCommand.getCommand()).append(" not recognized.").toString());
            }
            brokerAdmin.getMessageContainerAdmin(brokerAdminCommand.getDestination()).empty();
        }
    }

    public void consumeConsumerInfo(ConsumerInfo consumerInfo) throws JMSException {
        this.brokerConnector.getBrokerInfo().getBrokerName();
        if (consumerInfo.isStarted()) {
            this.consumers.add(consumerInfo);
            if (this.activeConsumers.add(consumerInfo)) {
                this.brokerConnector.registerMessageConsumer(this, consumerInfo);
                return;
            }
            return;
        }
        this.consumers.remove(consumerInfo);
        if (this.activeConsumers.remove(consumerInfo)) {
            this.brokerConnector.deregisterMessageConsumer(this, consumerInfo);
        }
    }

    @Override // org.activemq.broker.BrokerClient
    public void updateBrokerCapacity(int i) {
        CapacityInfo capacityInfo = new CapacityInfo();
        capacityInfo.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName());
        capacityInfo.setCapacity(i);
        capacityInfo.setFlowControlTimeout(getFlowControlTimeout(i));
        send(capacityInfo);
    }

    public void consumeConnectionInfo(ConnectionInfo connectionInfo) throws JMSException {
        this.connectionInfo = connectionInfo;
        if (connectionInfo.isClosed()) {
            try {
                cleanUp();
                if (connectionInfo.isReceiptRequired()) {
                    sendReceipt(connectionInfo.getId(), null, false);
                }
                connectionInfo.setReceiptRequired(false);
                try {
                    Thread.sleep(500L);
                } catch (Throwable th) {
                }
                return;
            } finally {
                close();
            }
        }
        if (!this.registered) {
            this.brokerConnector.registerClient(this, connectionInfo);
            this.registered = true;
        }
        synchronized (this.started) {
            if (connectionInfo.getProperties() != null && connectionInfo.getProperties().getProperty(ConnectionInfo.NO_DELAY_PROPERTY) != null) {
                this.channel.setNoDelay(new Boolean(connectionInfo.getProperties().getProperty(ConnectionInfo.NO_DELAY_PROPERTY)).booleanValue());
            }
            if (!this.started.get() && connectionInfo.isStarted()) {
                this.started.set(true);
                log.debug(new StringBuffer().append(this).append(" has started running client version ").append(connectionInfo.getClientVersion()).append(" , wire format = ").append(connectionInfo.getWireFormatVersion()).toString());
                Iterator it = this.consumers.iterator();
                while (it.hasNext()) {
                    ((ConsumerInfo) it.next()).setClientId(connectionInfo.getClientId());
                }
                Iterator it2 = this.producers.iterator();
                while (it2.hasNext()) {
                    ((ProducerInfo) it2.next()).setClientId(connectionInfo.getClientId());
                }
                Iterator it3 = this.sessions.iterator();
                while (it3.hasNext()) {
                    ((SessionInfo) it3.next()).setClientId(connectionInfo.getClientId());
                }
                for (int i = 0; i < this.dispatchQueue.size(); i++) {
                    dispatch((ActiveMQMessage) this.dispatchQueue.get(i));
                }
                this.dispatchQueue.clear();
            }
            if (this.started.get() && !connectionInfo.isStarted()) {
                this.started.set(false);
                log.debug(new StringBuffer().append(this).append(" has stopped").toString());
            }
        }
    }

    @Override // org.activemq.service.Service
    public void start() throws JMSException {
        this.channel.start();
    }

    @Override // org.activemq.service.Service
    public void stop() throws JMSException {
        log.trace(new StringBuffer().append("Stopping channel: ").append(this.channel).toString());
        this.channel.stop();
    }

    /* JADX WARN: Code restructure failed: missing block: B:36:0x00d3, code lost:
    
        if (org.activemq.broker.impl.BrokerClientImpl.log.isDebugEnabled() == false) goto L30;
     */
    /* JADX WARN: Code restructure failed: missing block: B:37:0x00d6, code lost:
    
        org.activemq.broker.impl.BrokerClientImpl.log.debug(new java.lang.StringBuffer().append(r4).append(" has stopped").toString());
     */
    /* JADX WARN: Code restructure failed: missing block: B:38:0x00f1, code lost:
    
        r4.consumers.clear();
        r4.producers.clear();
        r4.transactions.clear();
        r4.sessions.clear();
        r4.brokerConnector.deregisterClient(r4, r4.connectionInfo);
        r4.registered = false;
     */
    /* JADX WARN: Code restructure failed: missing block: B:40:0x00c8, code lost:
    
        throw r7;
     */
    @Override // org.activemq.broker.BrokerClient
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public synchronized void cleanUp() {
        /*
            Method dump skipped, instructions count: 353
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.activemq.broker.impl.BrokerClientImpl.cleanUp():void");
    }

    protected void send(Packet packet) {
        if (this.closed.get()) {
            return;
        }
        try {
            if (this.brokerConnection) {
                packet.addBrokerVisited(this.brokerConnector.getBrokerContainer().getBroker().getBrokerName());
                if (packet.hasVisited(this.remoteBrokerName)) {
                    if (log.isDebugEnabled()) {
                        log.debug(new StringBuffer().append("Not Forwarding: ").append(this.remoteBrokerName).append(" has already been visited by packet: ").append(packet).toString());
                        return;
                    }
                    return;
                }
            }
            packet.setId(this.packetIdGenerator.getNextShortSequence());
            this.channel.asyncSend(packet);
        } catch (JMSException e) {
            log.warn(new StringBuffer().append(this).append(" caught exception ").toString(), e);
            close();
        }
    }

    @Override // org.activemq.broker.BrokerClient
    public void validateConnection(int i) throws JMSException {
        KeepAlive keepAlive = new KeepAlive();
        keepAlive.setReceiptRequired(true);
        keepAlive.setId(this.packetIdGenerator.getNextShortSequence());
        if (getChannel().send(keepAlive, i) == null) {
            throw new JMSException("Client did not respond in time");
        }
    }

    protected void close() {
        if (this.closed.commit(false, true)) {
            this.channel.stop();
            log.debug(new StringBuffer().append(this).append(" has closed").toString());
        }
    }

    private void consumeActiveMQMessage(ActiveMQMessage activeMQMessage) throws JMSException {
        this.brokerConnector.sendMessage(this, activeMQMessage);
    }

    private void consumeMessageAck(MessageAck messageAck) throws JMSException {
        this.brokerConnector.acknowledgeMessage(this, messageAck);
    }

    private void consumeTransactionInfo(TransactionInfo transactionInfo) throws JMSException {
        if (transactionInfo.getType() == 101) {
            this.transactions.add(transactionInfo.getTransactionId());
            this.brokerConnector.startTransaction(this, transactionInfo.getTransactionId());
            return;
        }
        if (transactionInfo.getType() == 105) {
            this.brokerConnector.rollbackTransaction(this, transactionInfo.getTransactionId());
        } else if (transactionInfo.getType() == 103) {
            this.brokerConnector.commitTransaction(this, transactionInfo.getTransactionId());
        }
        this.transactions.remove(transactionInfo.getTransactionId());
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v43, types: [org.activemq.message.ActiveMQXid[], java.io.Serializable] */
    private void consumeXATransactionInfo(XATransactionInfo xATransactionInfo) throws JMSException, XAException {
        if (xATransactionInfo.getType() == 101) {
            this.brokerConnector.startTransaction(this, xATransactionInfo.getXid());
            return;
        }
        if (xATransactionInfo.getType() == 110) {
            ?? preparedTransactions = this.brokerConnector.getPreparedTransactions(this);
            xATransactionInfo.setReceiptRequired(false);
            ResponseReceipt responseReceipt = new ResponseReceipt();
            responseReceipt.setCorrelationId(xATransactionInfo.getId());
            responseReceipt.setResult(preparedTransactions);
            send(responseReceipt);
            return;
        }
        if (xATransactionInfo.getType() == 113) {
            String resourceManagerId = this.brokerConnector.getResourceManagerId(this);
            xATransactionInfo.setReceiptRequired(false);
            ResponseReceipt responseReceipt2 = new ResponseReceipt();
            responseReceipt2.setId(this.packetIdGenerator.getNextShortSequence());
            responseReceipt2.setCorrelationId(xATransactionInfo.getId());
            responseReceipt2.setResult(resourceManagerId);
            send(responseReceipt2);
            return;
        }
        if (xATransactionInfo.getType() == 106) {
            return;
        }
        if (xATransactionInfo.getType() == 102) {
            int prepareTransaction = this.brokerConnector.prepareTransaction(this, xATransactionInfo.getXid());
            xATransactionInfo.setReceiptRequired(false);
            IntResponseReceipt intResponseReceipt = new IntResponseReceipt();
            intResponseReceipt.setId(this.packetIdGenerator.getNextShortSequence());
            intResponseReceipt.setCorrelationId(xATransactionInfo.getId());
            intResponseReceipt.setResult(prepareTransaction);
            send(intResponseReceipt);
            return;
        }
        if (xATransactionInfo.getType() == 105) {
            this.brokerConnector.rollbackTransaction(this, xATransactionInfo.getXid());
        } else if (xATransactionInfo.getType() == 109) {
            this.brokerConnector.commitTransaction(this, xATransactionInfo.getXid(), true);
        } else {
            if (xATransactionInfo.getType() != 103) {
                throw new JMSException(new StringBuffer().append("Packet type: ").append(xATransactionInfo.getType()).append(" not recognized.").toString());
            }
            this.brokerConnector.commitTransaction(this, xATransactionInfo.getXid(), false);
        }
    }

    private void consumeProducerInfo(ProducerInfo producerInfo) throws JMSException {
        if (producerInfo.isStarted()) {
            this.producers.add(producerInfo);
            this.brokerConnector.registerMessageProducer(this, producerInfo);
        } else {
            this.producers.remove(producerInfo);
            this.brokerConnector.deregisterMessageProducer(this, producerInfo);
        }
    }

    private void consumeSessionInfo(SessionInfo sessionInfo) throws JMSException {
        if (sessionInfo.isStarted()) {
            this.sessions.add(sessionInfo);
            this.brokerConnector.registerSession(this, sessionInfo);
        } else {
            this.sessions.remove(sessionInfo);
            this.brokerConnector.deregisterSession(this, sessionInfo);
        }
    }

    private void consumeCapacityInfo(CapacityInfo capacityInfo) {
        this.capacity = capacityInfo.getCapacity();
    }

    private void updateCapacityInfo(short s) {
        CapacityInfo capacityInfo = new CapacityInfo();
        capacityInfo.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName());
        capacityInfo.setCorrelationId(s);
        capacityInfo.setCapacity(this.brokerConnector.getBrokerCapacity());
        capacityInfo.setFlowControlTimeout(getFlowControlTimeout(capacityInfo.getCapacity()));
        send(capacityInfo);
    }

    private long getFlowControlTimeout(int i) {
        long j = -1;
        if (i <= 0) {
            j = 10000;
        } else if (i <= 10) {
            j = 1000;
        } else if (i <= 20) {
            j = 10;
        }
        return j;
    }

    private void consumeBrokerInfo(BrokerInfo brokerInfo) {
        this.brokerConnection = true;
        this.started.set(true);
        this.remoteBrokerName = brokerInfo.getBrokerName();
        this.brokerConnector.getBrokerInfo().getBrokerName();
        if (getBrokerConnector().getBrokerContainer().getBroker().getBrokerClusterName().equals(brokerInfo.getClusterName())) {
            this.clusteredConnection = true;
        }
        if (this.remoteNetworkConnector || !brokerInfo.isRemote()) {
            return;
        }
        try {
            NetworkConnector networkConnector = new NetworkConnector(this.brokerConnector.getBrokerContainer());
            networkConnector.getThreadPool().execute(new Runnable(this, networkConnector, brokerInfo) { // from class: org.activemq.broker.impl.BrokerClientImpl.2
                private final NetworkConnector val$networkConnector;
                private final BrokerInfo val$info;
                private final BrokerClientImpl this$0;

                {
                    this.this$0 = this;
                    this.val$networkConnector = networkConnector;
                    this.val$info = brokerInfo;
                }

                @Override // java.lang.Runnable
                public void run() {
                    try {
                        this.val$networkConnector.addNetworkChannel(new NetworkChannel(this.val$networkConnector, this.this$0.brokerConnector.getBrokerContainer(), this.this$0.channel, this.val$info.getBrokerName(), this.val$info.getClusterName()));
                        this.this$0.brokerConnector.getBrokerContainer().addNetworkConnector(this.val$networkConnector);
                        this.val$networkConnector.start();
                    } catch (JMSException e) {
                        BrokerClientImpl.log.error("Failed to create reverse remote channel", e);
                    }
                }
            });
            log.info(new StringBuffer().append("Started reverse remote channel to ").append(this.remoteBrokerName).toString());
            this.remoteNetworkConnector = true;
        } catch (InterruptedException e) {
            log.error("Failed to create reverse remote channel", e);
        }
    }

    private void sendReceipt(short s, Throwable th, boolean z) {
        Receipt receipt = new Receipt();
        receipt.setCorrelationId(s);
        receipt.setBrokerName(this.brokerConnector.getBrokerInfo().getBrokerName());
        receipt.setClusterName(this.brokerConnector.getBrokerInfo().getClusterName());
        receipt.setException(th);
        receipt.setFailed(z);
        send(receipt);
    }

    @Override // org.activemq.broker.BrokerClient
    public void setSubject(Subject subject) {
        this.subject = subject;
    }

    @Override // org.activemq.broker.BrokerClient
    public Subject getSubject() {
        return this.subject;
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$activemq$broker$impl$BrokerClientImpl == null) {
            cls = class$("org.activemq.broker.impl.BrokerClientImpl");
            class$org$activemq$broker$impl$BrokerClientImpl = cls;
        } else {
            cls = class$org$activemq$broker$impl$BrokerClientImpl;
        }
        log = LogFactory.getLog(cls);
    }
}
