View Javadoc

1   /***
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.service.impl;
19  
20  import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.codehaus.activemq.filter.Filter;
24  import org.codehaus.activemq.message.ActiveMQDestination;
25  import org.codehaus.activemq.message.ActiveMQMessage;
26  import org.codehaus.activemq.message.ConsumerInfo;
27  import org.codehaus.activemq.message.MessageAck;
28  import org.codehaus.activemq.service.Dispatcher;
29  import org.codehaus.activemq.service.MessageContainer;
30  import org.codehaus.activemq.service.MessageIdentity;
31  import org.codehaus.activemq.service.QueueList;
32  import org.codehaus.activemq.service.QueueListEntry;
33  import org.codehaus.activemq.service.Subscription;
34  
35  import javax.jms.JMSException;
36  import java.util.ArrayList;
37  import java.util.List;
38  
39  /***
40   * A Subscription holds messages to be dispatched to a a Client Consumer
41   *
42   * @version $Revision: 1.17 $
43   */
44  public class SubscriptionImpl implements Subscription {
45      private static final Log log = LogFactory.getLog(SubscriptionImpl.class);
46  
47      private String clientId;
48      private String subscriberName;
49      private ActiveMQDestination destination;
50      private String selector;
51      private int prefetchLimit;
52      private boolean noLocal;
53      private boolean active;
54      private int consumerNumber;
55      private String consumerId;
56      private boolean browser;
57      protected Dispatcher dispatch;
58      private MessageIdentity lastMessageIdentity;
59      Filter filter;
60      protected SynchronizedInt unconsumedMessagesDispatched = new SynchronizedInt(0);
61      QueueList messagePtrs = new DefaultQueueList();
62      private boolean usePrefetch = false;
63  
64      /***
65       * Create a Subscription object that holds messages to be dispatched to a Consumer
66       *
67       * @param dispatcher
68       * @param info
69       * @param filter
70       */
71      public SubscriptionImpl(Dispatcher dispatcher, ConsumerInfo info, Filter filter) {
72          this.dispatch = dispatcher;
73          this.filter = filter;
74          setActiveConsumer(info);
75      }
76  
77      /***
78       * Set the active consumer info
79       *
80       * @param info
81       */
82      public void setActiveConsumer(ConsumerInfo info) {
83          if (info != null) {
84              this.clientId = info.getClientId();
85              this.subscriberName = info.getConsumerName();
86              this.noLocal = info.isNoLocal();
87              this.destination = info.getDestination();
88              this.selector = info.getSelector();
89              this.prefetchLimit = info.getPrefetchNumber();
90              this.consumerNumber = info.getConsumerNo();
91              this.consumerId = info.getConsumerId();
92              this.browser = info.isBrowser();
93          }
94      }
95  
96      /***
97       * @return pretty print of the Subscription
98       */
99      public String toString() {
100         String str = "SubscriptionImpl(" + super.hashCode() + ")[" + consumerId + "]" + clientId + ": " + subscriberName + " : "
101                 + destination;
102         return str;
103     }
104 
105     /***
106      * Called when the Subscription is discarded
107      *
108      * @throws JMSException
109      */
110     public synchronized void clear() throws JMSException {
111         QueueListEntry entry = messagePtrs.getFirstEntry();
112         while (entry != null) {
113             MessagePointer pointer = (MessagePointer) entry.getElement();
114             pointer.clear();
115             entry = messagePtrs.getNextEntry(entry);
116         }
117         messagePtrs.clear();
118     }
119 
120     /***
121      * Called when an active subscriber has closed. This resets all MessagePtrs
122      */
123     public synchronized void reset() throws JMSException {
124         QueueListEntry entry = messagePtrs.getFirstEntry();
125         while (entry != null) {
126             MessagePointer pointer = (MessagePointer) entry.getElement();
127             if (pointer.isDispatched()) {
128                 pointer.reset();
129             }
130             else {
131                 break;
132             }
133             entry = messagePtrs.getNextEntry(entry);
134         }
135     }
136 
137     /***
138      * @return Returns the clientId.
139      */
140     public String getClientId() {
141         return clientId;
142     }
143 
144     /***
145      * @param clientId The clientId to set.
146      */
147     public void setClientId(String clientId) {
148         this.clientId = clientId;
149     }
150 
151     /***
152      * @return Returns the filter.
153      */
154     public Filter getFilter() {
155         return filter;
156     }
157 
158     /***
159      * @param filter The filter to set.
160      */
161     public void setFilter(Filter filter) {
162         this.filter = filter;
163     }
164 
165     public boolean isWildcard() {
166         return filter.isWildcard();
167     }
168 
169     public String getPersistentKey() {
170         // not required other than for persistent topic subscriptions
171         return null;
172     }
173 
174     public boolean isSameDurableSubscription(ConsumerInfo info) throws JMSException {
175         if (isDurableTopic()) {
176             return equal(clientId, info.getClientId()) && equal(subscriberName, info.getConsumerName());
177         }
178         return false;
179     }
180 
181     /***
182      * @return Returns the noLocal.
183      */
184     public boolean isNoLocal() {
185         return noLocal;
186     }
187 
188     /***
189      * @param noLocal The noLocal to set.
190      */
191     public void setNoLocal(boolean noLocal) {
192         this.noLocal = noLocal;
193     }
194 
195     /***
196      * @return Returns the subscriberName.
197      */
198     public String getSubscriberName() {
199         return subscriberName;
200     }
201 
202     /***
203      * @param subscriberName The subscriberName to set.
204      */
205     public void setSubscriberName(String subscriberName) {
206         this.subscriberName = subscriberName;
207     }
208 
209     /***
210      * determines if the Subscription is interested in the message
211      *
212      * @param message
213      * @return true if this Subscription will accept the message
214      * @throws JMSException
215      */
216     public boolean isTarget(ActiveMQMessage message) throws JMSException {
217         boolean result = false;
218         if (message != null) {
219             result = filter.matches(message);
220 
221             // lets check that we don't have no-local enabled
222             if (noLocal && result) {
223                 if (clientIDsEqual(message)) {
224                     result = false;
225                 }
226             }
227         }
228         return result;
229     }
230 
231 
232     /***
233      * If the Subscription is a target for the message, the subscription will add a reference to the message and
234      * register an interest in the message to the container
235      *
236      * @param container
237      * @param message
238      * @throws JMSException
239      */
240     public synchronized void addMessage(MessageContainer container, ActiveMQMessage message) throws JMSException {
241         //log.info("######  Adding to subscription: " + this + " message: " + message);
242         if (log.isDebugEnabled()) {
243             log.debug("Adding to subscription: " + this + " message: " + message);
244         }
245         MessagePointer pointer = new MessagePointer(container, message.getJMSMessageIdentity());
246 
247         messagePtrs.add(pointer);
248         dispatch.wakeup(this);
249         lastMessageIdentity = message.getJMSMessageIdentity();
250     }
251 
252     /***
253      * Indicates a message has been delivered to a MessageConsumer
254      *
255      * @param ack
256      * @throws JMSException
257      */
258     public synchronized void messageConsumed(MessageAck ack) throws JMSException {
259         doMessageConsume(ack, true);
260     }
261 
262     public synchronized void onAcknowledgeTransactedMessageBeforeCommit(MessageAck ack) throws JMSException {
263         doMessageConsume(ack, false);
264     }
265 
266     public synchronized void redeliverMessage(MessageContainer container, MessageAck ack) throws JMSException {
267         QueueListEntry entry = messagePtrs.getFirstEntry();
268         while (entry != null) {
269             MessagePointer pointer = (MessagePointer) entry.getElement();
270             if (pointer.getMessageIdentity().getMessageID().equals(ack.getMessageID())) {
271                 break;
272             }
273             entry = messagePtrs.getNextEntry(entry);
274         }
275         if (entry != null) {
276             MessagePointer pointer = (MessagePointer) entry.getElement();
277             if (pointer != null) {
278                 unconsumedMessagesDispatched.increment();
279 
280                 //System.out.println("Incremented unconsumed count to: " + unconsumedMessagesDispatched.get());
281 
282                 pointer.reset();
283                 dispatch.wakeup(this);
284             }
285         }
286     }
287 
288     /***
289      * Retrieve messages to dispatch
290      *
291      * @return the messages to dispatch
292      * @throws JMSException
293      */
294     public synchronized ActiveMQMessage[] getMessagesToDispatch() throws JMSException {
295         if (usePrefetch) {
296             return getMessagesWithPrefetch();
297         }
298         List tmpList = new ArrayList();
299         QueueListEntry entry = messagePtrs.getFirstEntry();
300         while (entry != null) {
301             MessagePointer pointer = (MessagePointer) entry.getElement();
302             if (!pointer.isDispatched()) {
303                 ActiveMQMessage msg = pointer.getContainer().getMessage(pointer.getMessageIdentity());
304                 if (msg != null) {
305                     pointer.setDispatched(true);
306                     tmpList.add(msg);
307                 }
308                 else {
309                     //the message is probably expired
310                     log.info("Message probably expired: " + msg);
311                     QueueListEntry discarded = entry;
312                     entry = messagePtrs.getPrevEntry(discarded);
313                     messagePtrs.remove(discarded);
314                 }
315             }
316             entry = messagePtrs.getNextEntry(entry);
317         }
318         ActiveMQMessage[] messages = new ActiveMQMessage[tmpList.size()];
319         return (ActiveMQMessage[]) tmpList.toArray(messages);
320     }
321 
322     protected synchronized ActiveMQMessage[] getMessagesWithPrefetch() throws JMSException {
323         List tmpList = new ArrayList();
324         QueueListEntry entry = messagePtrs.getFirstEntry();
325         int count = 0;
326         int maxNumberToDispatch = prefetchLimit - unconsumedMessagesDispatched.get();
327 
328         while (entry != null && count < maxNumberToDispatch) {
329             MessagePointer pointer = (MessagePointer) entry.getElement();
330             if (!pointer.isDispatched()) {
331                 ActiveMQMessage msg = pointer.getContainer().getMessage(pointer.getMessageIdentity());
332                 if (msg != null) {
333                     pointer.setDispatched(true);
334                     tmpList.add(msg);
335                     unconsumedMessagesDispatched.increment();
336                     count++;
337                 }
338                 else {
339                     //the message is probably expired
340                     log.info("Message probably expired: " + msg);
341                     QueueListEntry discarded = entry;
342                     entry = messagePtrs.getPrevEntry(discarded);
343                     messagePtrs.remove(discarded);
344                 }
345             }
346             entry = messagePtrs.getNextEntry(entry);
347         }
348         /***
349          if (tmpList.isEmpty() && ! messagePtrs.isEmpty()) {
350          System.out.println("### Nothing to dispatch but messagePtrs still has: " + messagePtrs.size() + " to dispatch, prefetchLimit: " + prefetchLimit + " unconsumedMessagesDispatched: "  + unconsumedMessagesDispatched.get() + " maxNumberToDispatch: " + maxNumberToDispatch);
351          MessagePointer first = (MessagePointer) messagePtrs.getFirst();
352          System.out.println("###     First: " + first + " dispatched: " + first.isDispatched() + " id: " + first.getMessageIdentity());
353          }
354          else {
355          if (! tmpList.isEmpty()) {
356          System.out.println("### dispatching: " + tmpList.size() + " items = " + tmpList);
357          }
358          }
359          */
360         ActiveMQMessage[] messages = new ActiveMQMessage[tmpList.size()];
361         return (ActiveMQMessage[]) tmpList.toArray(messages);
362     }
363 
364     /***
365      * Indicates the Subscription it's reached it's pre-fetch limit
366      *
367      * @return true/false
368      * @throws JMSException
369      */
370     public synchronized boolean isAtPrefetchLimit() throws JMSException {
371         if (usePrefetch) {
372             int underlivedMessageCount = messagePtrs.size() - unconsumedMessagesDispatched.get();
373             return underlivedMessageCount >= prefetchLimit;
374         }
375         else {
376             return false;
377         }
378     }
379 
380     /***
381      * Indicates if this Subscription has more messages to send to the Consumer
382      *
383      * @return true if more messages available to dispatch
384      */
385     public synchronized boolean isReadyToDispatch() throws JMSException {
386         /*** TODO we may have dispatched messags inside messagePtrs */
387         boolean answer = active && messagePtrs.size() > 0;
388         return answer;
389     }
390 
391     /***
392      * @return Returns the destination.
393      */
394     public ActiveMQDestination getDestination() {
395         return destination;
396     }
397 
398     /***
399      * @return Returns the selector.
400      */
401     public String getSelector() {
402         return selector;
403     }
404 
405     /***
406      * @return Returns the active.
407      */
408     public synchronized boolean isActive() {
409         return active;
410     }
411 
412     /***
413      * @param active The active to set.
414      */
415     public synchronized void setActive(boolean active) throws JMSException {
416         this.active = active;
417         if (!active) {
418             reset();
419         }
420     }
421 
422     /***
423      * @return Returns the consumerNumber.
424      */
425     public int getConsumerNumber() {
426         return consumerNumber;
427     }
428 
429     /***
430      * @return the consumer Id for the active consumer
431      */
432     public String getConsumerId() {
433         return consumerId;
434     }
435 
436     /***
437      * Indicates the Subscriber is a Durable Subscriber
438      *
439      * @return true if the subscriber is a durable topic
440      * @throws JMSException
441      */
442     public boolean isDurableTopic() throws JMSException {
443         return destination.isTopic() && subscriberName != null && subscriberName.length() > 0;
444     }
445 
446     /***
447      * Indicates the consumer is a browser only
448      *
449      * @return true if a Browser
450      * @throws JMSException
451      */
452     public boolean isBrowser() throws JMSException {
453         return browser;
454     }
455 
456     public MessageIdentity getLastMessageIdentity() throws JMSException {
457         return lastMessageIdentity;
458     }
459 
460     public void setLastMessageIdentifier(MessageIdentity messageIdentity) throws JMSException {
461         this.lastMessageIdentity = messageIdentity;
462     }
463 
464 
465     /***
466      * Consume a message. If we are inside a transaction then we just update the consumed messages
467      * dispatched counter and we don't actually remove the message until a future call.
468      *
469      * @param ack    the ack command
470      * @param remove whether we should actually remove the message (i.e. really consume the message) or should
471      *               we just update the counters for the dispatcher / prefetch logic to work
472      */
473     protected synchronized void doMessageConsume(MessageAck ack, boolean remove) throws JMSException {
474         //remove up to this message
475         int count = 0;
476         boolean found = false;
477         QueueListEntry entry = messagePtrs.getFirstEntry();
478         while (entry != null) {
479             MessagePointer pointer = (MessagePointer) entry.getElement();
480             if (remove) {
481                 messagePtrs.remove(entry);
482                 if (ack.isMessageRead() && !browser) {
483                     pointer.delete(ack);//delete message from the container (if possible)
484                 }
485             }
486             count++;
487 
488             // in transactions, we decrement on the first call and then don't decrement
489             // the second call when we really remove the pointer
490             if (remove && !ack.isPartOfTransaction()) {
491                 unconsumedMessagesDispatched.decrement();
492             }
493             if (pointer.getMessageIdentity().equals(ack.getMessageIdentity())) {
494                 if (!remove && ack.isPartOfTransaction()) {
495                     // only decrement by one on the last message
496                     // as we will be keeping around all the unconsumedMessages
497                     unconsumedMessagesDispatched.decrement();
498                 }
499                 found = true;
500                 break;
501             }
502             entry = messagePtrs.getNextEntry(entry);
503         }
504         if (!found) {
505             log.warn("Did not find a matching message for identity: " + ack.getMessageIdentity());
506         }
507         dispatch.wakeup(this);
508     }
509 
510     protected boolean clientIDsEqual(ActiveMQMessage message) {
511         String msgClientID = message.getJMSClientID();
512         String producerClientID = message.getProducerID();
513         String subClientID = clientId;
514         if (producerClientID != null && producerClientID.equals(subClientID)) {
515             return true;
516         }
517         else if (msgClientID == subClientID) {
518             return true;
519         }
520         else if (msgClientID == null || subClientID == null) {
521             return false;
522         }
523         else {
524             return msgClientID.equals(subClientID);
525         }
526     }
527 
528     protected static final boolean equal(Object left, Object right) {
529         return left == right || (left != null && right != null && left.equals(right));
530     }
531 
532 }