View Javadoc

1   /***
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.service.impl;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.filter.Filter;
23  import org.codehaus.activemq.message.ConsumerInfo;
24  import org.codehaus.activemq.message.MessageAck;
25  import org.codehaus.activemq.service.Dispatcher;
26  import org.codehaus.activemq.service.MessageContainer;
27  import org.codehaus.activemq.service.QueueListEntry;
28  import org.codehaus.activemq.service.TopicMessageContainer;
29  
30  import javax.jms.JMSException;
31  import java.util.HashMap;
32  import java.util.Iterator;
33  import java.util.Map;
34  
35  /***
36   * Represents a durable topic subscription where the consumer has a unique
37   * clientID used to persist the messages across both Broker restarts and
38   * JMS client restarts
39   *
40   * @version $Revision: 1.8 $
41   */
42  public class DurableTopicSubscription extends SubscriptionImpl {
43  
44      private static final Log log = LogFactory.getLog(DurableTopicSubscription.class);
45  
46      private String persistentKey;
47  
48      public DurableTopicSubscription(Dispatcher dispatcher, ConsumerInfo info, Filter filter) {
49          super(dispatcher, info, filter);
50      }
51  
52      public synchronized void messageConsumed(MessageAck ack) throws JMSException {
53          if (!ack.isMessageRead() && !isBrowser()) {
54              super.messageConsumed(ack);
55          }
56          else {
57              Map lastMessagePointersPerContainer = new HashMap();
58  
59              //remove up to this message
60              boolean found = false;
61              QueueListEntry queueEntry = messagePtrs.getFirstEntry();
62              while (queueEntry != null) {
63                  MessagePointer pointer = (MessagePointer) queueEntry.getElement();
64  
65                  messagePtrs.remove(queueEntry);
66                  lastMessagePointersPerContainer.put(pointer.getContainer(), pointer);
67                  unconsumedMessagesDispatched.decrement();
68  
69                  if (pointer.getMessageIdentity().equals(ack.getMessageIdentity())) {
70                      found = true;
71                      break;
72                  }
73                  queueEntry = messagePtrs.getNextEntry(queueEntry);
74              }
75              if (!found) {
76                  log.warn("Did not find a matching message for identity: " + ack.getMessageIdentity());
77              }
78  
79              // now lets tell each container to update its lastAcknowlegedMessageID
80              for (Iterator iter = lastMessagePointersPerContainer.entrySet().iterator(); iter.hasNext();) {
81                  Map.Entry entry = (Map.Entry) iter.next();
82                  TopicMessageContainer container = (TopicMessageContainer) entry.getKey();
83                  MessagePointer pointer = (MessagePointer) entry.getValue();
84                  container.setLastAcknowledgedMessageID(this, pointer.getMessageIdentity());
85              }
86  
87              //System.out.println("Message consumed. Remaining: " + messagePtrs.size() + " unconsumedMessagesDispatched: " + unconsumedMessagesDispatched.get());
88              dispatch.wakeup(this);
89          }
90      }
91  
92      public synchronized void redeliverMessage(MessageContainer container, MessageAck ack) throws JMSException {
93          // rollback is rare so we don't have to be super efficient here
94          MessagePointer pointer = new MessagePointer(container, ack.getMessageIdentity());
95          messagePtrs.add(pointer);
96          unconsumedMessagesDispatched.increment();
97      }
98  
99      public String getPersistentKey() {
100         if (persistentKey == null) {
101             persistentKey = "[" + getClientId() + ":" + getSubscriberName() + "]";
102         }
103         return persistentKey;
104     }
105 }