View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.service.boundedvm;
20  
21  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
22  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
23  import org.codehaus.activemq.broker.BrokerClient;
24  import org.codehaus.activemq.filter.AndFilter;
25  import org.codehaus.activemq.filter.Filter;
26  import org.codehaus.activemq.filter.FilterFactory;
27  import org.codehaus.activemq.filter.FilterFactoryImpl;
28  import org.codehaus.activemq.filter.NoLocalFilter;
29  import org.codehaus.activemq.message.ActiveMQMessage;
30  import org.codehaus.activemq.message.ConsumerInfo;
31  import org.codehaus.activemq.message.MessageAck;
32  import org.codehaus.activemq.message.util.MemoryBoundedQueue;
33  import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
34  import org.codehaus.activemq.service.MessageContainer;
35  import org.codehaus.activemq.service.MessageContainerManager;
36  
37  import javax.jms.JMSException;
38  import java.util.Iterator;
39  
40  /***
41   * A MessageContainerManager for transient topics
42   * 
43   * @version $Revision: 1.6 $
44   */
45  
46  /***
47   * A manager of MessageContainer instances
48   */
49  public class TransientTopicBoundedMessageManager implements MessageContainerManager {
50      private MemoryBoundedQueueManager queueManager;
51      private ConcurrentHashMap containers;
52      private FilterFactory filterFactory;
53      private SynchronizedBoolean started;
54  
55      /***
56       * Constructor for TransientTopicBoundedMessageManager
57       *
58       * @param mgr
59       */
60      public TransientTopicBoundedMessageManager(MemoryBoundedQueueManager mgr) {
61          this.queueManager = mgr;
62          this.containers = new ConcurrentHashMap();
63          this.filterFactory = new FilterFactoryImpl();
64          this.started = new SynchronizedBoolean(false);
65      }
66  
67      /***
68       * start the manager
69       *
70       * @throws JMSException
71       */
72      public void start() throws JMSException {
73          if (started.commit(false, true)) {
74              for (Iterator i = containers.values().iterator(); i.hasNext();) {
75                  TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
76                  container.start();
77              }
78          }
79      }
80  
81      /***
82       * stop the manager
83       *
84       * @throws JMSException
85       */
86      public void stop() throws JMSException {
87          if (started.commit(true, false)) {
88              for (Iterator i = containers.values().iterator(); i.hasNext();) {
89                  TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
90                  container.stop();
91              }
92          }
93      }
94  
95      /***
96       * Add a consumer if appropiate
97       *
98       * @param client
99       * @param info
100      * @throws JMSException
101      */
102     public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
103         if (info.getDestination().isTopic()) {
104             TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) containers
105                     .get(client);
106             if (container == null) {
107                 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(client.toString());
108                 container = new TransientTopicBoundedMessageContainer(client, queue);
109                 containers.put(client, container);
110                 if (started.get()) {
111                     container.start();
112                 }
113             }
114             container.addConsumer(createFilter(info), info);
115         }
116     }
117 
118     /***
119      * @param client
120      * @param info
121      * @throws JMSException
122      */
123     public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
124         if (info.getDestination().isTopic()) {
125             TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) containers
126                     .get(client);
127             if (container != null) {
128                 container.removeConsumer(info);
129                 if (container.isInactive()) {
130                     containers.remove(client);
131                     container.close();
132                 }
133             }
134         }
135     }
136 
137     /***
138      * Delete a durable subscriber
139      *
140      * @param clientId
141      * @param subscriberName
142      * @throws JMSException if the subscriber doesn't exist or is still active
143      */
144     public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
145     }
146 
147     /***
148      * @param client
149      * @param message
150      * @throws JMSException
151      */
152     public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
153         if (message != null && message.getJMSActiveMQDestination().isTopic()) {
154             for (Iterator i = containers.values().iterator(); i.hasNext();) {
155                 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
156                 container.targetAndDispatch(message.shallowCopy());
157                 //System.out.println("ENQ msg(" + message.getMemoryUsage() + ") total cap = " + queueManager.getCurrentCapacity());
158             }
159         }
160     }
161 
162     public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
163     }
164 
165     public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack) throws JMSException {
166     }
167 
168     public void redeliverMessage(BrokerClient client, MessageAck ack) throws JMSException {
169     }
170 
171     public void poll() throws JMSException {
172     }
173 
174     /***
175      * A hook when the transaction is about to be commited; so apply all outstanding commands to the Journal if using a
176      * Journal (transaction log)
177      *
178      * @param client
179      * @param transactionId
180      * @throws JMSException
181      */
182     public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
183     }
184 
185     /***
186      * A hook when the transaction is about to be rolled back; so discard all outstanding commands that are pending to
187      * be written to the Journal
188      *
189      * @param client
190      * @param transactionId
191      */
192     public void rollbackTransaction(BrokerClient client, String transactionId) {
193     }
194 
195     public MessageContainer getContainer(String physicalName) {
196         return null;
197     }
198 
199     /***
200      * Create filter for a Consumer
201      *
202      * @param info
203      * @return the Fitler
204      * @throws javax.jms.JMSException
205      */
206     protected Filter createFilter(ConsumerInfo info) throws JMSException {
207         Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
208         if (info.isNoLocal()) {
209             filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
210         }
211         return filter;
212     }
213 }