1 /*** 2 * 3 * Copyright 2004 Protique Ltd 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 **/ 18 package org.codehaus.activemq.service.impl; 19 20 import org.apache.commons.logging.Log; 21 import org.apache.commons.logging.LogFactory; 22 import org.codehaus.activemq.broker.BrokerClient; 23 import org.codehaus.activemq.filter.FilterFactory; 24 import org.codehaus.activemq.filter.FilterFactoryImpl; 25 import org.codehaus.activemq.message.ActiveMQDestination; 26 import org.codehaus.activemq.message.ActiveMQMessage; 27 import org.codehaus.activemq.message.ConsumerInfo; 28 import org.codehaus.activemq.service.Dispatcher; 29 import org.codehaus.activemq.service.MessageContainer; 30 import org.codehaus.activemq.service.Subscription; 31 import org.codehaus.activemq.service.SubscriptionContainer; 32 import org.codehaus.activemq.store.PersistenceAdapter; 33 34 import javax.jms.DeliveryMode; 35 import javax.jms.JMSException; 36 import java.util.Iterator; 37 38 /*** 39 * A default implementation of a Broker of Topic messages for transient consumers 40 * 41 * @version $Revision: 1.3 $ 42 */ 43 public class TransientTopicMessageContainerManager extends DurableTopicMessageContainerManager { 44 private static final Log log = LogFactory.getLog(TransientTopicMessageContainerManager.class); 45 46 public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter) { 47 this(persistenceAdapter, new SubscriptionContainerImpl(), new FilterFactoryImpl(), new DispatcherImpl()); 48 } 49 50 public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) { 51 super(persistenceAdapter, subscriptionContainer, filterFactory, dispatcher); 52 } 53 54 /*** 55 * @param client 56 * @param info 57 * @throws javax.jms.JMSException 58 */ 59 public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 60 if (info.getDestination().isTopic()) { 61 doAddMessageConsumer(client, info); 62 } 63 } 64 65 66 /*** 67 * @param client 68 * @param info 69 * @throws javax.jms.JMSException 70 */ 71 public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 72 Subscription sub = (Subscription) activeSubscriptions.remove(info.getConsumerId()); 73 if (sub != null) { 74 sub.setActive(false); 75 dispatcher.removeActiveSubscription(client, sub); 76 subscriptionContainer.removeSubscription(info.getConsumerId()); 77 sub.clear(); 78 } 79 } 80 81 82 /*** 83 * @param client 84 * @param message 85 * @throws javax.jms.JMSException 86 */ 87 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException { 88 ActiveMQDestination dest = (ActiveMQDestination) message.getJMSDestination(); 89 if (dest != null && dest.isTopic()) { 90 MessageContainer container = null; 91 if (log.isDebugEnabled()) { 92 log.debug("Dispaching to " + subscriptionContainer + " subscriptions with message: " + message); 93 } 94 for (Iterator i = subscriptionContainer.subscriptionIterator(); i.hasNext();) { 95 Subscription sub = (Subscription) i.next(); 96 if (sub.isTarget(message) && (!sub.isDurableTopic() || message.getJMSDeliveryMode() == DeliveryMode.NON_PERSISTENT)) { 97 if (container == null) { 98 container = getContainer(message.getJMSDestination().toString()); 99 container.addMessage(message); 100 } 101 sub.addMessage(container, message); 102 } 103 } 104 } 105 } 106 107 /*** 108 * Delete a durable subscriber 109 * 110 * @param clientId 111 * @param subscriberName 112 * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active 113 */ 114 public void deleteSubscription(String clientId, String subscriberName) throws JMSException { 115 } 116 }