1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq;
19
20 import javax.jms.IllegalStateException;
21 import javax.jms.InvalidDestinationException;
22 import javax.jms.JMSException;
23 import javax.jms.Message;
24 import javax.jms.MessageConsumer;
25 import javax.jms.MessageListener;
26 import javax.management.j2ee.statistics.Stats;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.codehaus.activemq.management.JMSConsumerStatsImpl;
30 import org.codehaus.activemq.management.StatsCapable;
31 import org.codehaus.activemq.message.ActiveMQDestination;
32 import org.codehaus.activemq.message.ActiveMQMessage;
33 import org.codehaus.activemq.message.util.MemoryBoundedQueue;
34 import org.codehaus.activemq.selector.SelectorParser;
35
36 /***
37 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages from a destination. A <CODE>
38 * MessageConsumer</CODE> object is created by passing a <CODE>Destination</CODE> object to a message-consumer
39 * creation method supplied by a session.
40 * <P>
41 * <CODE>MessageConsumer</CODE> is the parent interface for all message consumers.
42 * <P>
43 * A message consumer can be created with a message selector. A message selector allows the client to restrict the
44 * messages delivered to the message consumer to those that match the selector.
45 * <P>
46 * A client may either synchronously receive a message consumer's messages or have the consumer asynchronously deliver
47 * them as they arrive.
48 * <P>
49 * For synchronous receipt, a client can request the next message from a message consumer using one of its <CODE>
50 * receive</CODE> methods. There are several variations of <CODE>receive</CODE> that allow a client to poll or wait
51 * for the next message.
52 * <P>
53 * For asynchronous delivery, a client can register a <CODE>MessageListener</CODE> object with a message consumer. As
54 * messages arrive at the message consumer, it delivers them by calling the <CODE>MessageListener</CODE>'s<CODE>
55 * onMessage</CODE> method.
56 * <P>
57 * It is a client programming error for a <CODE>MessageListener</CODE> to throw an exception.
58 *
59 * @version $Revision: 1.27 $
60 * @see javax.jms.MessageConsumer
61 * @see javax.jms.QueueReceiver
62 * @see javax.jms.TopicSubscriber
63 * @see javax.jms.Session
64 */
65 public class ActiveMQMessageConsumer implements MessageConsumer, StatsCapable {
66 private static final Log log = LogFactory.getLog(ActiveMQMessageConsumer.class);
67 protected ActiveMQSession session;
68 protected String consumerId;
69 protected MemoryBoundedQueue messageQueue;
70 protected String messageSelector;
71 private MessageListener messageListener;
72 protected String consumerName;
73 protected ActiveMQDestination destination;
74 private boolean closed;
75 protected int consumerNumber;
76 protected int prefetchNumber;
77 protected long startTime;
78 protected boolean noLocal;
79 protected boolean browser;
80 private Thread accessThread;
81 private Object messageListenerGuard;
82 private JMSConsumerStatsImpl stats;
83
84 /***
85 * Create a MessageConsumer
86 *
87 * @param theSession
88 * @param dest
89 * @param name
90 * @param selector
91 * @param cnum
92 * @param prefetch
93 * @param noLocalValue
94 * @param browserValue
95 * @throws JMSException
96 */
97 protected ActiveMQMessageConsumer(ActiveMQSession theSession, ActiveMQDestination dest, String name,
98 String selector, int cnum, int prefetch, boolean noLocalValue, boolean browserValue) throws JMSException {
99 if (dest == null) {
100 throw new InvalidDestinationException("Do not understand a null destination");
101 }
102 if (dest.isTemporary()) {
103
104 String physicalName = dest.getPhysicalName();
105 if (physicalName == null) {
106 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
107 }
108 String clientID = theSession.connection.clientID;
109 if (physicalName.indexOf(clientID) < 0) {
110 throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
111 }
112 }
113 if (selector != null) {
114 selector = selector.trim();
115 if (selector.length() > 0) {
116
117 new SelectorParser().parse(selector);
118 }
119 }
120 this.session = theSession;
121 this.destination = dest;
122 this.consumerName = name;
123 this.messageSelector = selector;
124
125 this.consumerNumber = cnum;
126 this.prefetchNumber = prefetch;
127 this.noLocal = noLocalValue;
128 this.browser = browserValue;
129 this.startTime = System.currentTimeMillis();
130 this.messageListenerGuard = new Object();
131 String queueName = theSession.connection.clientID + ":" + name;
132 queueName += ":" + cnum;
133 this.messageQueue = theSession.connection.getMemoryBoundedQueue(queueName);
134 this.stats = new JMSConsumerStatsImpl(theSession.getSessionStats(), dest);
135 this.session.addConsumer(this);
136 }
137
138 /***
139 * @return Stats for this MessageConsumer
140 */
141 public Stats getStats() {
142 return stats;
143 }
144
145 /***
146 * @return Stats for this MessageConsumer
147 */
148 public JMSConsumerStatsImpl getConsumerStats() {
149 return stats;
150 }
151
152 /***
153 * @return pretty print of this consumer
154 */
155 public String toString() {
156 return "MessageConsumer: " + consumerId;
157 }
158
159 /***
160 * @return Returns the prefetchNumber.
161 */
162 public int getPrefetchNumber() {
163 return prefetchNumber;
164 }
165
166 /***
167 * @param prefetchNumber The prefetchNumber to set.
168 */
169 public void setPrefetchNumber(int prefetchNumber) {
170 this.prefetchNumber = prefetchNumber;
171 }
172
173 /***
174 * Gets this message consumer's message selector expression.
175 *
176 * @return this message consumer's message selector, or null if no message selector exists for the message consumer
177 * (that is, if the message selector was not set or was set to null or the empty string)
178 * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
179 */
180 public String getMessageSelector() throws JMSException {
181 checkClosed();
182 return this.messageSelector;
183 }
184
185 /***
186 * Gets the message consumer's <CODE>MessageListener</CODE>.
187 *
188 * @return the listener for the message consumer, or null if no listener is set
189 * @throws JMSException if the JMS provider fails to get the message listener due to some internal error.
190 * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
191 */
192 public MessageListener getMessageListener() throws JMSException {
193 checkClosed();
194 return this.messageListener;
195 }
196
197 /***
198 * Sets the message consumer's <CODE>MessageListener</CODE>.
199 * <P>
200 * Setting the message listener to null is the equivalent of unsetting the message listener for the message
201 * consumer.
202 * <P>
203 * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE> while messages are being consumed by an
204 * existing listener or the consumer is being used to consume messages synchronously is undefined.
205 *
206 * @param listener the listener to which the messages are to be delivered
207 * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
208 * @see javax.jms.MessageConsumer#getMessageListener()
209 */
210 public void setMessageListener(MessageListener listener) throws JMSException {
211 checkClosed();
212 this.messageListener = listener;
213 }
214
215 /***
216 * Receives the next message produced for this message consumer.
217 * <P>
218 * This call blocks indefinitely until a message is produced or until this message consumer is closed.
219 * <P>
220 * If this <CODE>receive</CODE> is done within a transaction, the consumer retains the message until the
221 * transaction commits.
222 *
223 * @return the next message produced for this message consumer, or null if this message consumer is concurrently
224 * closed
225 * @throws JMSException
226 */
227 public Message receive() throws JMSException {
228 checkClosed();
229 try {
230 this.accessThread = Thread.currentThread();
231 ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue();
232 this.accessThread = null;
233 messageDelivered(message, true);
234 return message;
235 }
236 catch (InterruptedException ioe) {
237 return null;
238 }
239 }
240
241 /***
242 * Receives the next message that arrives within the specified timeout interval.
243 * <P>
244 * This call blocks until a message arrives, the timeout expires, or this message consumer is closed. A <CODE>
245 * timeout</CODE> of zero never expires, and the call blocks indefinitely.
246 *
247 * @param timeout the timeout value (in milliseconds)
248 * @return the next message produced for this message consumer, or null if the timeout expires or this message
249 * consumer is concurrently closed
250 * @throws JMSException
251 */
252 public Message receive(long timeout) throws JMSException {
253 checkClosed();
254 try {
255 if (timeout == 0) {
256 return this.receive();
257 }
258 this.accessThread = Thread.currentThread();
259 ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(timeout);
260 this.accessThread = null;
261
262 messageDelivered(message, true);
263 return message;
264 }
265 catch (InterruptedException ioe) {
266 return null;
267 }
268 }
269
270 /***
271 * Receives the next message if one is immediately available.
272 *
273 * @return the next message produced for this message consumer, or null if one is not available
274 * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
275 */
276 public Message receiveNoWait() throws JMSException {
277 checkClosed();
278 if (messageQueue.size() > 0) {
279 try {
280 ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue();
281 messageDelivered(message, true);
282 return message;
283 }
284 catch (InterruptedException ioe) {
285 throw new JMSException("Queue is interrupted: " + ioe.getMessage());
286 }
287 }
288 return null;
289 }
290
291 /***
292 * Closes the message consumer.
293 * <P>
294 * Since a provider may allocate some resources on behalf of a <CODE>MessageConsumer</CODE> outside the Java
295 * virtual machine, clients should close them when they are not needed. Relying on garbage collection to eventually
296 * reclaim these resources may not be timely enough.
297 * <P>
298 * This call blocks until a <CODE>receive</CODE> or message listener in progress has completed. A blocked message
299 * consumer <CODE>receive</CODE> call returns null when this message consumer is closed.
300 *
301 * @throws JMSException if the JMS provider fails to close the consumer due to some internal error.
302 */
303 public void close() throws JMSException {
304 try {
305 this.accessThread.interrupt();
306 }
307 catch (NullPointerException npe) {
308 }
309 catch (SecurityException se) {
310 }
311 this.session.removeConsumer(this);
312 messageQueue.close();
313 closed = true;
314 }
315
316 /***
317 * @return true if this is a durable topic subscriber
318 */
319 public boolean isDurableSubscriber() {
320 return this instanceof ActiveMQTopicSubscriber && consumerName != null && consumerName.length() > 0;
321 }
322
323 /***
324 * @throws IllegalStateException
325 */
326 protected void checkClosed() throws IllegalStateException {
327 if (closed) {
328 throw new IllegalStateException("The Consumer is closed");
329 }
330 }
331
332 /***
333 * Process a Message - passing either to the queue or message listener
334 *
335 * @param message
336 */
337 protected void processMessage(ActiveMQMessage message) {
338 message.setConsumerId(this.consumerId);
339 MessageListener listener = null;
340 synchronized (messageListenerGuard) {
341 listener = this.messageListener;
342 }
343 try {
344 if (!closed) {
345 if (listener != null) {
346 listener.onMessage(message);
347 messageDelivered(message, true);
348 }
349 else {
350 this.messageQueue.enqueue(message);
351 }
352 }
353 else {
354 messageDelivered(message, false);
355 }
356 }
357 catch (Exception e) {
358 log.warn("could not process message: " + message, e);
359
360 messageDelivered(message, false);
361
362
363 }
364 }
365
366 /***
367 * @return Returns the consumerId.
368 */
369 protected String getConsumerId() {
370 return consumerId;
371 }
372
373 /***
374 * @param consumerId The consumerId to set.
375 */
376 protected void setConsumerId(String consumerId) {
377 this.consumerId = consumerId;
378 }
379
380 /***
381 * @return the consumer name - used for durable consumers
382 */
383 protected String getConsumerName() {
384 return this.consumerName;
385 }
386
387 /***
388 * Set the name of the Consumer - used for durable subscribers
389 *
390 * @param value
391 */
392 protected void setConsumerName(String value) {
393 this.consumerName = value;
394 }
395
396 /***
397 * @return the locally unique Consumer Number
398 */
399 protected int getConsumerNumber() {
400 return this.consumerNumber;
401 }
402
403 /***
404 * Set the locally unique consumer number
405 *
406 * @param value
407 */
408 protected void setConsumerNumber(int value) {
409 this.consumerNumber = value;
410 }
411
412 /***
413 * @return true if this consumer does not accept locally produced messages
414 */
415 protected boolean isNoLocal() {
416 return this.noLocal;
417 }
418
419 /***
420 * Retrive is a browser
421 *
422 * @return true if a browser
423 */
424 protected boolean isBrowser() {
425 return this.browser;
426 }
427
428 /***
429 * Set true if only a Browser
430 *
431 * @param value
432 * @see ActiveMQQueueBrowser
433 */
434 protected void setBrowser(boolean value) {
435 this.browser = value;
436 }
437
438 /***
439 * @return ActiveMQDestination
440 */
441 protected ActiveMQDestination getDestination() {
442 return this.destination;
443 }
444
445 /***
446 * @return the startTime
447 */
448 protected long getStartTime() {
449 return startTime;
450 }
451
452 private void messageDelivered(ActiveMQMessage message, boolean messageRead) {
453 boolean read = browser ? false : messageRead;
454 this.session.messageDelivered((isDurableSubscriber() || destination.isQueue()),message, read, true);
455 if (messageRead) {
456 stats.onMessage(message);
457 }
458 }
459 }