1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.service.impl;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.message.ActiveMQMessage;
23 import org.codehaus.activemq.message.MessageAck;
24 import org.codehaus.activemq.service.MessageIdentity;
25 import org.codehaus.activemq.service.QueueList;
26 import org.codehaus.activemq.service.QueueListEntry;
27 import org.codehaus.activemq.service.QueueMessageContainer;
28 import org.codehaus.activemq.store.MessageStore;
29 import org.codehaus.activemq.store.PersistenceAdapter;
30 import org.codehaus.activemq.util.Callback;
31 import org.codehaus.activemq.util.TransactionTemplate;
32
33 import javax.jms.JMSException;
34
35 /***
36 * A default implemenation of a Durable Queue based
37 * {@link org.codehaus.activemq.service.MessageContainer}
38 * which acts as an adapter between the {@link org.codehaus.activemq.service.MessageContainerManager}
39 * requirements and those of the persistent {@link MessageStore} implementations.
40 *
41 * @version $Revision: 1.18 $
42 */
43 public class DurableQueueMessageContainer implements QueueMessageContainer {
44 private static final Log log = LogFactory.getLog(DurableQueueMessageContainer.class);
45
46 private MessageStore messageStore;
47 private String destinationName;
48
49 /***
50 * messages to be delivered
51 */
52 private QueueList messagesToBeDelivered;
53 /***
54 * messages that have been delivered but not acknowledged
55 */
56 private QueueList deliveredMessages;
57 private PersistenceAdapter persistenceAdapter;
58 private TransactionTemplate transactionTemplate;
59
60 public DurableQueueMessageContainer(PersistenceAdapter persistenceAdapter, MessageStore messageStore, String destinationName) {
61 this(persistenceAdapter, messageStore, destinationName, new DefaultQueueList(), new DefaultQueueList());
62 }
63
64 public DurableQueueMessageContainer(PersistenceAdapter persistenceAdapter, MessageStore messageStore, String destinationName, QueueList messagesToBeDelivered, QueueList deliveredMessages) {
65 this.persistenceAdapter = persistenceAdapter;
66 this.messageStore = messageStore;
67 this.destinationName = destinationName;
68 this.messagesToBeDelivered = messagesToBeDelivered;
69 this.deliveredMessages = deliveredMessages;
70 this.transactionTemplate = new TransactionTemplate(persistenceAdapter);
71 }
72
73 public String getDestinationName() {
74 return destinationName;
75 }
76
77 public synchronized MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
78 MessageIdentity answer = messageStore.addMessage(message);
79 messagesToBeDelivered.add(answer);
80 return answer;
81
82 }
83
84 public synchronized void delete(MessageIdentity messageID, MessageAck ack) throws JMSException {
85 boolean found = false;
86 QueueListEntry entry = deliveredMessages.getFirstEntry();
87 while (entry != null) {
88 MessageIdentity identity = (MessageIdentity) entry.getElement();
89 if (messageID.equals(identity)) {
90 deliveredMessages.remove(entry);
91
92
93
94 messageStore.removeMessage(identity, ack);
95 found = true;
96 break;
97 }
98 entry = deliveredMessages.getNextEntry(entry);
99 }
100 if (!found) {
101
102
103 entry = messagesToBeDelivered.getFirstEntry();
104 while (entry != null) {
105 MessageIdentity identity = (MessageIdentity) entry.getElement();
106 if (messageID.equals(identity)) {
107 messagesToBeDelivered.remove(entry);
108
109
110
111 messageStore.removeMessage(identity, ack);
112 found = true;
113 break;
114 }
115 entry = messagesToBeDelivered.getNextEntry(entry);
116 }
117
118 if (!found) {
119 log.error("Attempt to acknowledge unknown messageID: " + messageID);
120 deliveredMessages.remove(messageID);
121 }
122 }
123 }
124
125 public synchronized ActiveMQMessage getMessage(MessageIdentity messageID) throws JMSException {
126 return messageStore.getMessage(messageID);
127 }
128
129
130 public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
131 /*** TODO: make more optimal implementation */
132 return getMessage(messageIdentity) != null;
133 }
134
135 /***
136 * Does nothing since when we receive an acknowledgement on a queue
137 * we can delete the message
138 *
139 * @param messageIdentity
140 */
141 public void registerMessageInterest(MessageIdentity messageIdentity) {
142 }
143
144 /***
145 * Does nothing since when we receive an acknowledgement on a queue
146 * we can delete the message
147 *
148 * @param messageIdentity
149 * @param ack
150 */
151 public void unregisterMessageInterest(MessageIdentity messageIdentity, MessageAck ack) {
152 }
153
154 public synchronized ActiveMQMessage poll() throws JMSException {
155 ActiveMQMessage message = null;
156 MessageIdentity messageIdentity = (MessageIdentity) messagesToBeDelivered.removeFirst();
157 if (messageIdentity != null) {
158 message = messageStore.getMessage(messageIdentity);
159 deliveredMessages.add(messageIdentity);
160 }
161 return message;
162 }
163
164 public synchronized ActiveMQMessage peekNext(MessageIdentity messageID) throws JMSException {
165 ActiveMQMessage answer = null;
166 if (messageID == null) {
167 MessageIdentity identity = (MessageIdentity) messagesToBeDelivered.getFirst();
168 if (identity != null) {
169 answer = messageStore.getMessage(identity);
170 }
171 }
172 else {
173 int index = messagesToBeDelivered.indexOf(messageID);
174 if (index >= 0 && (index + 1) < messagesToBeDelivered.size()) {
175 messageID = (MessageIdentity) messagesToBeDelivered.get(index + 1);
176 if (messageID != null) {
177 answer = messageStore.getMessage(messageID);
178 }
179 }
180 }
181 return answer;
182 }
183
184
185 public synchronized void returnMessage(MessageIdentity messageIdentity) throws JMSException {
186 boolean result = deliveredMessages.remove(messageIdentity);
187 messagesToBeDelivered.addFirst(messageIdentity);
188 }
189
190 /***
191 * called to reset dispatch pointers if a new Message Consumer joins
192 *
193 * @throws javax.jms.JMSException
194 */
195 public synchronized void reset() throws JMSException {
196
197 int count = 0;
198 MessageIdentity messageIdentity = (MessageIdentity) deliveredMessages.removeFirst();
199 while (messageIdentity != null) {
200 messagesToBeDelivered.add(count++, messageIdentity);
201 messageIdentity = (MessageIdentity) deliveredMessages.removeFirst();
202 }
203 }
204
205 public synchronized void start() throws JMSException {
206 final QueueMessageContainer container = this;
207 transactionTemplate.run(new Callback() {
208 public void execute() throws Throwable {
209 messageStore.start();
210 messageStore.recover(container);
211 }
212 });
213
214 }
215
216 public synchronized void recoverMessageToBeDelivered(MessageIdentity messageIdentity) throws JMSException {
217 messagesToBeDelivered.add(messageIdentity);
218 }
219
220 public void stop() throws JMSException {
221 messageStore.stop();
222 }
223 }