package org.codehaus.activemq.transport;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import java.util.Iterator;
import java.util.Map;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.ActiveMQConnection;
import org.codehaus.activemq.ActiveMQConnectionFactory;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.broker.BrokerContainer;
import org.codehaus.activemq.broker.ConsumerInfoListener;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.BrokerInfo;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.Receipt;
import org.codehaus.activemq.service.MessageContainerManager;
import org.codehaus.activemq.service.Service;
import org.codehaus.activemq.transport.composite.CompositeTransportChannel;

/* loaded from: input_file:org/codehaus/activemq/transport/NetworkChannel.class */
public class NetworkChannel implements Service, ConsumerInfoListener {
    private static final Log log;
    private String uri;
    private BrokerContainer brokerContainer;
    private ActiveMQConnection localConnection;
    private ActiveMQConnection remoteConnection;
    private ConcurrentHashMap consumerMap;
    private String remoteUserName;
    private String remotePassword;
    private String remoteBrokerName;
    private String remoteClusterName;
    private int maximumRetries;
    private long reconnectSleepTime;
    static Class class$org$codehaus$activemq$transport$NetworkChannel;

    public NetworkChannel() {
        this.maximumRetries = 0;
        this.reconnectSleepTime = 1000L;
        this.consumerMap = new ConcurrentHashMap();
    }

    public NetworkChannel(BrokerContainer brokerContainer, String str) {
        this();
        this.brokerContainer = brokerContainer;
        this.uri = str;
    }

    public String toString() {
        return new StringBuffer().append(super.toString()).append("[uri=").append(this.uri).append("]").toString();
    }

    @Override // org.codehaus.activemq.service.Service
    public void start() throws JMSException {
        Thread thread = new Thread(new Runnable(this) { // from class: org.codehaus.activemq.transport.NetworkChannel.1
            private final NetworkChannel this$0;

            {
                this.this$0 = this;
            }

            @Override // java.lang.Runnable
            public void run() {
                try {
                    this.this$0.initialize();
                    this.this$0.brokerContainer.getBroker().addConsumerInfoListener(this.this$0);
                    this.this$0.startSubscriptions();
                    NetworkChannel.log.info(new StringBuffer().append("Started NetworkChannel to ").append(this.this$0.uri).toString());
                } catch (JMSException e) {
                    NetworkChannel.log.error(new StringBuffer().append("Failed to start NetworkChannel: ").append(this.this$0.uri).toString());
                }
            }
        }, "NetworkChannel Starter");
        thread.setDaemon(true);
        thread.start();
    }

    @Override // org.codehaus.activemq.service.Service
    public void stop() throws JMSException {
        this.consumerMap.clear();
        if (this.remoteConnection != null) {
            this.remoteConnection.close();
            this.remoteConnection = null;
        }
        if (this.localConnection != null) {
            this.localConnection.close();
            this.localConnection = null;
        }
        Iterator it = this.consumerMap.values().iterator();
        while (it.hasNext()) {
            ((NetworkMessageBridge) it.next()).stop();
        }
    }

    @Override // org.codehaus.activemq.broker.ConsumerInfoListener
    public void onConsumerInfo(BrokerClient brokerClient, ConsumerInfo consumerInfo) {
        if (brokerClient.isClusteredConnection() || consumerInfo.hasVisited(this.remoteBrokerName)) {
            return;
        }
        if (consumerInfo.isStarted()) {
            addConsumerInfo(consumerInfo);
        } else {
            removeConsumerInfo(consumerInfo);
        }
    }

    public String getUri() {
        return this.uri;
    }

    public void setUri(String str) {
        this.uri = str;
    }

    public String getRemotePassword() {
        return this.remotePassword;
    }

    public void setRemotePassword(String str) {
        this.remotePassword = str;
    }

    public String getRemoteUserName() {
        return this.remoteUserName;
    }

    public void setRemoteUserName(String str) {
        this.remoteUserName = str;
    }

    public BrokerContainer getBrokerContainer() {
        return this.brokerContainer;
    }

    public void setBrokerContainer(BrokerContainer brokerContainer) {
        this.brokerContainer = brokerContainer;
    }

    public int getMaximumRetries() {
        return this.maximumRetries;
    }

    public void setMaximumRetries(int i) {
        this.maximumRetries = i;
    }

    public long getReconnectSleepTime() {
        return this.reconnectSleepTime;
    }

    public void setReconnectSleepTime(long j) {
        this.reconnectSleepTime = j;
    }

    public String getRemoteBrokerName() {
        return this.remoteBrokerName;
    }

    public void setRemoteBrokerName(String str) {
        this.remoteBrokerName = str;
    }

    private void addConsumerInfo(ConsumerInfo consumerInfo) {
        addConsumerInfo(consumerInfo.getDestination(), consumerInfo.isDurableTopic());
    }

    private void addConsumerInfo(ActiveMQDestination activeMQDestination, boolean z) {
        NetworkMessageBridge networkMessageBridge = new NetworkMessageBridge();
        networkMessageBridge.setDestination(activeMQDestination);
        networkMessageBridge.setDurableTopic(z);
        NetworkMessageBridge networkMessageBridge2 = (NetworkMessageBridge) this.consumerMap.get(networkMessageBridge);
        if (networkMessageBridge2 == null) {
            try {
                networkMessageBridge2 = networkMessageBridge;
                networkMessageBridge2.setLocalBrokerName(this.brokerContainer.getBroker().getBrokerName());
                networkMessageBridge2.setLocalSession(this.localConnection.createSession(false, 2));
                networkMessageBridge2.setRemoteSession(this.remoteConnection.createSession(false, 2));
                this.consumerMap.put(networkMessageBridge2, networkMessageBridge2);
                networkMessageBridge2.start();
                log.info(new StringBuffer().append("started NetworkMessageBridge for destination: ").append(activeMQDestination).toString());
            } catch (JMSException e) {
                log.error(new StringBuffer().append("Failed to start NetworkMessageBridge for destination: ").append(activeMQDestination).toString());
            }
        }
        networkMessageBridge2.incrementReferenceCount();
    }

    private void removeConsumerInfo(ConsumerInfo consumerInfo) {
        NetworkMessageBridge networkMessageBridge = new NetworkMessageBridge();
        networkMessageBridge.setDestination(consumerInfo.getDestination());
        networkMessageBridge.setDurableTopic(consumerInfo.isDurableTopic());
        NetworkMessageBridge networkMessageBridge2 = (NetworkMessageBridge) this.consumerMap.get(networkMessageBridge);
        if (networkMessageBridge2 == null || networkMessageBridge2.decrementReferenceCount() > 0 || networkMessageBridge2.isDurableTopic()) {
            return;
        }
        if (networkMessageBridge2.getDestination().isTopic() || networkMessageBridge2.getDestination().isTemporary()) {
            Thread thread = new Thread(new Runnable(this, networkMessageBridge2, consumerInfo) { // from class: org.codehaus.activemq.transport.NetworkChannel.2
                private final NetworkMessageBridge val$bridge;
                private final ConsumerInfo val$info;
                private final NetworkChannel this$0;

                {
                    this.this$0 = this;
                    this.val$bridge = networkMessageBridge2;
                    this.val$info = consumerInfo;
                }

                @Override // java.lang.Runnable
                public void run() {
                    this.val$bridge.stop();
                    this.this$0.consumerMap.remove(this.val$bridge);
                    NetworkChannel.log.info(new StringBuffer().append("stopped MetworkMessageBridge for destination: ").append(this.val$info.getDestination()).toString());
                }
            });
            thread.setDaemon(true);
            thread.start();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startSubscriptions() {
        MessageContainerManager persistentTopicContainerManager = this.brokerContainer.getBroker().getPersistentTopicContainerManager();
        if (persistentTopicContainerManager != null) {
            startSubscriptions(persistentTopicContainerManager.getDestinations(), true);
        }
        for (MessageContainerManager messageContainerManager : this.brokerContainer.getBroker().getContainerManagerMap().values()) {
            if (messageContainerManager != persistentTopicContainerManager) {
                startSubscriptions(messageContainerManager.getDestinations(), false);
            }
        }
    }

    private void startSubscriptions(Map map, boolean z) {
        if (map != null) {
            Iterator it = map.values().iterator();
            while (it.hasNext()) {
                addConsumerInfo((ActiveMQDestination) it.next(), z);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initialize() throws JMSException {
        initializeRemote();
        initializeLocal();
    }

    private void initializeRemote() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.remoteUserName, this.remotePassword, this.uri);
        activeMQConnectionFactory.setUseAsyncSend(true);
        this.remoteConnection = (ActiveMQConnection) activeMQConnectionFactory.createConnection();
        this.remoteConnection.setJ2EEcompliant(false);
        this.remoteConnection.setClientID(new StringBuffer().append(this.brokerContainer.getBroker().getBrokerName()).append("_NetworkChannel").toString());
        TransportChannel transportChannel = this.remoteConnection.getTransportChannel();
        if (transportChannel instanceof CompositeTransportChannel) {
            CompositeTransportChannel compositeTransportChannel = (CompositeTransportChannel) transportChannel;
            compositeTransportChannel.setMaximumRetries(this.maximumRetries);
            compositeTransportChannel.setFailureSleepTime(this.reconnectSleepTime);
        }
        this.remoteConnection.start();
        BrokerInfo brokerInfo = new BrokerInfo();
        brokerInfo.setBrokerName(this.brokerContainer.getBroker().getBrokerName());
        brokerInfo.setClusterName(this.brokerContainer.getBroker().getBrokerClusterName());
        Receipt syncSendRequest = this.remoteConnection.syncSendRequest(brokerInfo);
        this.remoteBrokerName = syncSendRequest.getBrokerName();
        this.remoteClusterName = syncSendRequest.getClusterName();
    }

    private void initializeLocal() throws JMSException {
        String brokerName = this.brokerContainer.getBroker().getBrokerName();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(new StringBuffer().append("vm://").append(brokerName).toString());
        activeMQConnectionFactory.setUseAsyncSend(true);
        activeMQConnectionFactory.setBrokerName(brokerName);
        this.localConnection = (ActiveMQConnection) activeMQConnectionFactory.createConnection();
        this.localConnection.setJ2EEcompliant(false);
        this.localConnection.start();
        BrokerInfo brokerInfo = new BrokerInfo();
        brokerInfo.setBrokerName(this.remoteBrokerName);
        brokerInfo.setClusterName(this.remoteClusterName);
        this.localConnection.asyncSendPacket(brokerInfo);
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$codehaus$activemq$transport$NetworkChannel == null) {
            cls = class$("org.codehaus.activemq.transport.NetworkChannel");
            class$org$codehaus$activemq$transport$NetworkChannel = cls;
        } else {
            cls = class$org$codehaus$activemq$transport$NetworkChannel;
        }
        log = LogFactory.getLog(cls);
    }
}
