001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.service.impl; 019 020 import java.util.HashMap; 021 import java.util.Iterator; 022 import java.util.Map; 023 024 import javax.jms.JMSException; 025 026 import org.apache.commons.logging.Log; 027 import org.apache.commons.logging.LogFactory; 028 import org.activemq.broker.BrokerClient; 029 import org.activemq.filter.Filter; 030 import org.activemq.message.ConsumerInfo; 031 import org.activemq.message.MessageAck; 032 import org.activemq.service.DeadLetterPolicy; 033 import org.activemq.service.Dispatcher; 034 import org.activemq.service.QueueListEntry; 035 import org.activemq.service.RedeliveryPolicy; 036 import org.activemq.service.TopicMessageContainer; 037 import org.activemq.service.TransactionManager; 038 import org.activemq.service.TransactionTask; 039 040 /** 041 * Represents a durable topic subscription where the consumer has a unique 042 * clientID used to persist the messages across both Broker restarts and 043 * JMS client restarts 044 * 045 * @version $Revision: 1.1.1.1 $ 046 */ 047 public class DurableTopicSubscription extends SubscriptionImpl { 048 049 private static final Log log = LogFactory.getLog(DurableTopicSubscription.class); 050 051 private String persistentKey; 052 053 public DurableTopicSubscription(Dispatcher dispatcher, BrokerClient client, ConsumerInfo info, Filter filter, RedeliveryPolicy redeliveryPolicy,DeadLetterPolicy deadLetterPolicy) { 054 super(dispatcher, client, info, filter, redeliveryPolicy,deadLetterPolicy); 055 } 056 057 public synchronized void messageConsumed(MessageAck ack) throws JMSException { 058 if (ack.isExpired() || (!ack.isMessageRead() && !isBrowser())) { 059 super.messageConsumed(ack); 060 } 061 else { 062 final Map lastMessagePointersPerContainer = new HashMap(); 063 064 //remove up to this message 065 boolean found = false; 066 QueueListEntry queueEntry = messagePtrs.getFirstEntry(); 067 while (queueEntry != null) { 068 final MessagePointer pointer = (MessagePointer) queueEntry.getElement(); 069 070 messagePtrs.remove(queueEntry); 071 lastMessagePointersPerContainer.put(pointer.getContainer(), pointer); 072 unconsumedMessagesDispatched.decrement(); 073 074 TransactionManager.getContexTransaction().addPostRollbackTask(new TransactionTask(){ 075 public void execute() throws Throwable { 076 unconsumedMessagesDispatched.increment(); 077 MessagePointer p = new MessagePointer(pointer); 078 p.setRedelivered(true); 079 messagePtrs.add(p); 080 dispatch.wakeup(DurableTopicSubscription.this); 081 lastMessageIdentity = pointer.getMessageIdentity(); 082 } 083 }); 084 085 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){ 086 public void execute() throws Throwable { 087 // now lets tell each container to update its lastAcknowlegedMessageID 088 for (Iterator iter = lastMessagePointersPerContainer.entrySet().iterator(); iter.hasNext();) { 089 Map.Entry entry = (Map.Entry) iter.next(); 090 TopicMessageContainer container = (TopicMessageContainer) entry.getKey(); 091 MessagePointer pointer = (MessagePointer) entry.getValue(); 092 container.setLastAcknowledgedMessageID(DurableTopicSubscription.this, pointer.getMessageIdentity()); 093 } 094 } 095 }); 096 097 if (pointer.getMessageIdentity().equals(ack.getMessageIdentity())) { 098 found = true; 099 break; 100 } 101 queueEntry = messagePtrs.getNextEntry(queueEntry); 102 } 103 if (!found) { 104 log.warn("Did not find a matching message for identity: " + ack.getMessageIdentity()); 105 } 106 //System.out.println("Message consumed. Remaining: " + messagePtrs.size() + " unconsumedMessagesDispatched: " + unconsumedMessagesDispatched.get()); 107 dispatch.wakeup(this); 108 } 109 } 110 111 public String getPersistentKey() { 112 if (persistentKey == null) { 113 persistentKey = "[" + getClientId() + ":" + getSubscriberName() + "]"; 114 } 115 return persistentKey; 116 } 117 }