001    /**
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.service.impl;
019    
020    import java.util.HashMap;
021    import java.util.Iterator;
022    import java.util.Map;
023    
024    import javax.jms.JMSException;
025    
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    import org.activemq.broker.BrokerClient;
029    import org.activemq.filter.Filter;
030    import org.activemq.message.ConsumerInfo;
031    import org.activemq.message.MessageAck;
032    import org.activemq.service.DeadLetterPolicy;
033    import org.activemq.service.Dispatcher;
034    import org.activemq.service.QueueListEntry;
035    import org.activemq.service.RedeliveryPolicy;
036    import org.activemq.service.TopicMessageContainer;
037    import org.activemq.service.TransactionManager;
038    import org.activemq.service.TransactionTask;
039    
040    /**
041     * Represents a durable topic subscription where the consumer has a unique
042     * clientID used to persist the messages across both Broker restarts and
043     * JMS client restarts
044     *
045     * @version $Revision: 1.1.1.1 $
046     */
047    public class DurableTopicSubscription extends SubscriptionImpl {
048    
049        private static final Log log = LogFactory.getLog(DurableTopicSubscription.class);
050    
051        private String persistentKey;
052    
053        public DurableTopicSubscription(Dispatcher dispatcher, BrokerClient client, ConsumerInfo info, Filter filter, RedeliveryPolicy redeliveryPolicy,DeadLetterPolicy deadLetterPolicy) {
054            super(dispatcher, client, info, filter, redeliveryPolicy,deadLetterPolicy);
055        }
056    
057        public synchronized void messageConsumed(MessageAck ack) throws JMSException {
058            if (ack.isExpired() || (!ack.isMessageRead() && !isBrowser())) {
059                super.messageConsumed(ack);
060            }
061            else {
062                final Map lastMessagePointersPerContainer = new HashMap();
063    
064                //remove up to this message
065                boolean found = false;
066                QueueListEntry queueEntry = messagePtrs.getFirstEntry();
067                while (queueEntry != null) {
068                    final MessagePointer pointer = (MessagePointer) queueEntry.getElement();
069    
070                    messagePtrs.remove(queueEntry);
071                    lastMessagePointersPerContainer.put(pointer.getContainer(), pointer);
072                    unconsumedMessagesDispatched.decrement();
073    
074                    TransactionManager.getContexTransaction().addPostRollbackTask(new TransactionTask(){
075                        public void execute() throws Throwable {                        
076                            unconsumedMessagesDispatched.increment();
077                            MessagePointer p = new MessagePointer(pointer);
078                            p.setRedelivered(true);
079                            messagePtrs.add(p);
080                            dispatch.wakeup(DurableTopicSubscription.this);
081                            lastMessageIdentity = pointer.getMessageIdentity();
082                        }
083                    });
084    
085                    TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){
086                        public void execute() throws Throwable {                        
087                            // now lets tell each container to update its lastAcknowlegedMessageID
088                            for (Iterator iter = lastMessagePointersPerContainer.entrySet().iterator(); iter.hasNext();) {
089                                Map.Entry entry = (Map.Entry) iter.next();
090                                TopicMessageContainer container = (TopicMessageContainer) entry.getKey();
091                                MessagePointer pointer = (MessagePointer) entry.getValue();
092                                container.setLastAcknowledgedMessageID(DurableTopicSubscription.this, pointer.getMessageIdentity());
093                            }
094                        }
095                    });
096                    
097                    if (pointer.getMessageIdentity().equals(ack.getMessageIdentity())) {
098                        found = true;
099                        break;
100                    }
101                    queueEntry = messagePtrs.getNextEntry(queueEntry);
102                }
103                if (!found) {
104                    log.warn("Did not find a matching message for identity: " + ack.getMessageIdentity());
105                }
106                //System.out.println("Message consumed. Remaining: " + messagePtrs.size() + " unconsumedMessagesDispatched: " + unconsumedMessagesDispatched.get());
107                dispatch.wakeup(this);
108            }
109        }
110    
111        public String getPersistentKey() {
112            if (persistentKey == null) {
113                persistentKey = "[" + getClientId() + ":" + getSubscriberName() + "]";
114            }
115            return persistentKey;
116        }
117    }