package org.servicemix.jbi.nmr.flow.jms;

import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.jbi.JBIException;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.Topic;
import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQConnectionFactory;
import org.activemq.advisories.ConsumerAdvisor;
import org.activemq.advisories.ConsumerAdvisoryEvent;
import org.activemq.advisories.ConsumerAdvisoryEventListener;
import org.activemq.message.ConsumerInfo;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.servicemix.jbi.framework.ComponentConnector;
import org.servicemix.jbi.framework.ComponentNameSpace;
import org.servicemix.jbi.framework.ComponentPacket;
import org.servicemix.jbi.framework.ComponentPacketEvent;
import org.servicemix.jbi.framework.ComponentPacketEventListener;
import org.servicemix.jbi.framework.LocalComponentConnector;
import org.servicemix.jbi.messaging.MessageExchangeImpl;
import org.servicemix.jbi.nmr.Broker;
import org.servicemix.jbi.nmr.flow.AbstractFlow;

/* loaded from: input_file:org/servicemix/jbi/nmr/flow/jms/JMSFlow.class */
public class JMSFlow extends AbstractFlow implements ConsumerAdvisoryEventListener, MessageListener, ComponentPacketEventListener {
    private static final Log log;
    private static final String INBOUND_PREFIX = "org.servicemix.inbound.";
    private String userName;
    private String password;
    private ActiveMQConnectionFactory connectionFactory;
    private ActiveMQConnection connection;
    private MessageProducer queueProducer;
    private MessageProducer topicProducer;
    private Topic broadcastTopic;
    private Session broadcastSession;
    private Session inboundSession;
    private ConsumerAdvisor advisor;
    static Class class$org$servicemix$jbi$nmr$flow$jms$JMSFlow;
    private String jmsURL = "peer://org.servicemix?persistent=false";
    private String broadcastDestinationName = "org.servicemix.JMSFlow";
    private Map networkNodeKeyMap = new ConcurrentHashMap();
    private Map networkComponentKeyMap = new ConcurrentHashMap();
    private Map consumerMap = new ConcurrentHashMap();
    private AtomicBoolean started = new AtomicBoolean(false);

    @Override // org.servicemix.jbi.management.MBeanInfoProvider
    public String getDescription() {
        return "jms";
    }

    public String getJmsURL() {
        return this.jmsURL;
    }

    public void setJmsURL(String str) {
        this.jmsURL = str;
    }

    public String getPassword() {
        return this.password;
    }

    public void setPassword(String str) {
        this.password = str;
    }

    public String getUserName() {
        return this.userName;
    }

    public void setUserName(String str) {
        this.userName = str;
    }

    public ActiveMQConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    public void setConnectionFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
        this.connectionFactory = activeMQConnectionFactory;
    }

    public String getBroadcastDestinationName() {
        return this.broadcastDestinationName;
    }

    public void setBroadcastDestinationName(String str) {
        this.broadcastDestinationName = str;
    }

    @Override // org.servicemix.jbi.nmr.flow.AbstractFlow, org.servicemix.jbi.nmr.flow.Flow
    public void init(Broker broker, String str) throws JBIException {
        super.init(broker, str);
        broker.getRegistry().addComponentPacketListener(this);
        try {
            if (this.connectionFactory == null) {
                if (this.jmsURL != null) {
                    this.connectionFactory = new ActiveMQConnectionFactory(this.jmsURL);
                } else {
                    this.connectionFactory = new ActiveMQConnectionFactory();
                }
            }
            if (this.userName != null) {
                this.connection = this.connectionFactory.createConnection(this.userName, this.password);
            } else {
                this.connection = this.connectionFactory.createConnection();
            }
            this.connection.setClientID(broker.getContainerName());
            this.connection.start();
            this.inboundSession = this.connection.createSession(false, 1);
            this.inboundSession.createConsumer(this.inboundSession.createQueue(new StringBuffer().append(INBOUND_PREFIX).append(broker.getContainerName()).toString())).setMessageListener(this);
            this.queueProducer = this.inboundSession.createProducer((Destination) null);
            this.broadcastSession = this.connection.createSession(false, 1);
            this.broadcastTopic = this.broadcastSession.createTopic(this.broadcastDestinationName);
            this.broadcastSession.createConsumer(this.broadcastTopic, (String) null, true).setMessageListener(this);
            this.topicProducer = this.broadcastSession.createProducer(this.broadcastTopic);
            this.topicProducer.setDeliveryMode(1);
            this.advisor = new ConsumerAdvisor(this.connection, this.broadcastTopic);
            this.advisor.addListener(this);
        } catch (JMSException e) {
            log.error("Failed to initialize JMSFlow", e);
            throw new JBIException(e);
        }
    }

    @Override // org.servicemix.jbi.nmr.flow.AbstractFlow, org.servicemix.jbi.management.BaseLifeCycle, javax.jbi.management.LifeCycleMBean, javax.jbi.component.ComponentLifeCycle
    public void start() throws JBIException {
        if (this.started.compareAndSet(false, true)) {
            super.start();
            try {
                this.advisor.start();
            } catch (JMSException e) {
                throw new JBIException(new StringBuffer().append("JMSException caught in start: ").append(e.getMessage()).toString());
            }
        }
    }

    @Override // org.servicemix.jbi.nmr.flow.AbstractFlow, org.servicemix.jbi.management.BaseLifeCycle, javax.jbi.management.LifeCycleMBean, javax.jbi.component.ComponentLifeCycle
    public void stop() throws JBIException {
        if (this.started.compareAndSet(true, false)) {
            super.stop();
            try {
                this.advisor.stop();
            } catch (JMSException e) {
                throw new JBIException(new StringBuffer().append("JMSException caught in stop: ").append(e.getMessage()).toString());
            }
        }
    }

    @Override // org.servicemix.jbi.nmr.flow.AbstractFlow, org.servicemix.jbi.management.BaseLifeCycle, javax.jbi.management.LifeCycleMBean, javax.jbi.component.ComponentLifeCycle
    public void shutDown() throws JBIException {
        super.shutDown();
        stop();
        if (this.connection != null) {
            try {
                this.connection.close();
            } catch (JMSException e) {
                log.warn("Error closing JMS Connection", e);
            }
        }
    }

    public int numberInNetwork() {
        return this.advisor.activeConsumers(this.broadcastTopic).size();
    }

    @Override // org.servicemix.jbi.framework.ComponentPacketEventListener
    public void onEvent(ComponentPacketEvent componentPacketEvent) {
        MessageConsumer messageConsumer;
        try {
            String name = componentPacketEvent.getPacket().getComponentNameSpace().getName();
            if (componentPacketEvent.getStatus() == ComponentPacketEvent.ACTIVATED) {
                MessageConsumer createConsumer = this.inboundSession.createConsumer(this.inboundSession.createQueue(new StringBuffer().append(INBOUND_PREFIX).append(name).toString()));
                createConsumer.setMessageListener(this);
                this.consumerMap.put(name, createConsumer);
            } else if (componentPacketEvent.getStatus() == ComponentPacketEvent.DEACTIVATED && (messageConsumer = (MessageConsumer) this.consumerMap.remove(name)) != null) {
                messageConsumer.close();
            }
            this.topicProducer.send(this.broadcastSession.createObjectMessage(componentPacketEvent));
            log.info(new StringBuffer().append("broadcast to internal JMS network: ").append(componentPacketEvent).toString());
        } catch (JMSException e) {
            log.error(new StringBuffer().append("failed to broadcast to the internal JMS network: ").append(componentPacketEvent).toString(), e);
        }
    }

    public void onEvent(ConsumerAdvisoryEvent consumerAdvisoryEvent) {
        if (this.started.get()) {
            ConsumerInfo info = consumerAdvisoryEvent.getInfo();
            if (!info.isStarted()) {
                removeAllPackets(info.getClientId());
                return;
            }
            Iterator it = this.broker.getRegistry().getLocalComponentConnectors().iterator();
            while (it.hasNext()) {
                onEvent(new ComponentPacketEvent(((LocalComponentConnector) it.next()).getPacket(), ComponentPacketEvent.ACTIVATED));
            }
        }
    }

    @Override // org.servicemix.jbi.nmr.flow.AbstractFlow
    protected void doSend(MessageExchangeImpl messageExchangeImpl) throws MessagingException {
        doRouting(messageExchangeImpl);
    }

    @Override // org.servicemix.jbi.nmr.flow.AbstractFlow
    public void doRouting(MessageExchangeImpl messageExchangeImpl) throws MessagingException {
        ComponentNameSpace destinationId = messageExchangeImpl.getRole() == MessageExchange.Role.PROVIDER ? messageExchangeImpl.getDestinationId() : messageExchangeImpl.getSourceId();
        ComponentConnector componentConnector = this.broker.getRegistry().getComponentConnector(destinationId);
        if (componentConnector == null) {
            throw new MessagingException(new StringBuffer().append("No component with id (").append(destinationId).append(") - Couldn't route MessageExchange ").append(messageExchangeImpl).toString());
        }
        try {
            this.queueProducer.send(this.inboundSession.createQueue(messageExchangeImpl.getRole() == MessageExchange.Role.PROVIDER ? new StringBuffer().append(INBOUND_PREFIX).append(componentConnector.getComponentNameSpace().getName()).toString() : new StringBuffer().append(INBOUND_PREFIX).append(destinationId.getContainerName()).toString()), this.inboundSession.createObjectMessage(messageExchangeImpl));
        } catch (JMSException e) {
            log.error(new StringBuffer().append("Failed to send exchange: ").append(messageExchangeImpl).append(" internal JMS Network").toString(), e);
            throw new MessagingException(e);
        }
    }

    public void onMessage(Message message) {
        Serializable object;
        if (message != null) {
            try {
                if ((message instanceof ObjectMessage) && (object = ((ObjectMessage) message).getObject()) != null) {
                    if (object instanceof ComponentPacketEvent) {
                        ComponentPacketEvent componentPacketEvent = (ComponentPacketEvent) object;
                        processInBoundPacket(componentPacketEvent.getPacket().getComponentNameSpace().getContainerName(), componentPacketEvent);
                    } else if (object instanceof MessageExchangeImpl) {
                        super.doRouting((MessageExchangeImpl) object);
                    }
                }
            } catch (JMSException e) {
                log.error("Caught an exception unpacking JMS Message: ", e);
            } catch (MessagingException e2) {
                log.error("Caught an exception routing ExchangePacket: ", e2);
            }
        }
    }

    protected void processInBoundPacket(String str, ComponentPacketEvent componentPacketEvent) {
        ComponentPacket packet = componentPacketEvent.getPacket();
        if (packet.getComponentNameSpace().getContainerName().equals(this.broker.getContainerName())) {
            return;
        }
        if (componentPacketEvent.getStatus() == ComponentPacketEvent.ACTIVATED) {
            addRemotePacket(str, packet);
            return;
        }
        if (componentPacketEvent.getStatus() == ComponentPacketEvent.DEACTIVATED) {
            removeRemotePacket(str, packet);
        } else if (componentPacketEvent.getStatus() == ComponentPacketEvent.STATE_CHANGE) {
            updateRemotePacket(str, packet);
        } else {
            log.warn(new StringBuffer().append("Unable to determine ComponentPacketEvent type: ").append(componentPacketEvent.getStatus()).append(" for packet: ").append(packet).toString());
        }
    }

    private void addRemotePacket(String str, ComponentPacket componentPacket) {
        this.networkComponentKeyMap.put(componentPacket.getComponentNameSpace(), str);
        CopyOnWriteArraySet copyOnWriteArraySet = (Set) this.networkNodeKeyMap.get(str);
        if (copyOnWriteArraySet == null) {
            copyOnWriteArraySet = new CopyOnWriteArraySet();
            this.networkNodeKeyMap.put(str, copyOnWriteArraySet);
        }
        ComponentConnector componentConnector = new ComponentConnector(componentPacket);
        log.info(new StringBuffer().append("Adding Remote Component: ").append(componentConnector).toString());
        this.broker.getRegistry().addRemoteComponentConnector(componentConnector);
        copyOnWriteArraySet.add(componentPacket);
    }

    private void updateRemotePacket(String str, ComponentPacket componentPacket) {
        Set set = (Set) this.networkNodeKeyMap.get(str);
        if (set != null) {
            set.remove(componentPacket);
            set.add(componentPacket);
        }
        ComponentConnector componentConnector = new ComponentConnector(componentPacket);
        log.info(new StringBuffer().append("Updating remote Component: ").append(componentConnector).toString());
        this.broker.getRegistry().updateRemoteComponentConnector(componentConnector);
    }

    private void removeRemotePacket(String str, ComponentPacket componentPacket) {
        this.networkComponentKeyMap.remove(componentPacket.getComponentNameSpace());
        Set set = (Set) this.networkNodeKeyMap.get(str);
        if (set != null) {
            set.remove(componentPacket);
            ComponentConnector componentConnector = new ComponentConnector(componentPacket);
            log.info(new StringBuffer().append("Removing remote Component: ").append(componentConnector).toString());
            this.broker.getRegistry().removeRemoteComponentConnector(componentConnector);
            if (set.isEmpty()) {
                this.networkNodeKeyMap.remove(str);
            }
        }
    }

    private void removeAllPackets(String str) {
        Set<ComponentPacket> set = (Set) this.networkNodeKeyMap.remove(str);
        if (set != null) {
            for (ComponentPacket componentPacket : set) {
                ComponentConnector componentConnector = new ComponentConnector(componentPacket);
                log.info(new StringBuffer().append("Network node: ").append(str).append(" Stopped. Removing remote Component: ").append(componentConnector).toString());
                this.broker.getRegistry().removeRemoteComponentConnector(componentConnector);
                this.networkComponentKeyMap.remove(componentPacket.getComponentNameSpace());
            }
        }
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$servicemix$jbi$nmr$flow$jms$JMSFlow == null) {
            cls = class$("org.servicemix.jbi.nmr.flow.jms.JMSFlow");
            class$org$servicemix$jbi$nmr$flow$jms$JMSFlow = cls;
        } else {
            cls = class$org$servicemix$jbi$nmr$flow$jms$JMSFlow;
        }
        log = LogFactory.getLog(cls);
    }
}
