001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq; 019 020 import java.util.LinkedList; 021 022 import javax.jms.IllegalStateException; 023 import javax.jms.InvalidDestinationException; 024 import javax.jms.JMSException; 025 import javax.jms.Message; 026 import javax.jms.MessageConsumer; 027 import javax.jms.MessageListener; 028 029 import org.activemq.io.util.MemoryBoundedQueue; 030 import org.activemq.management.JMSConsumerStatsImpl; 031 import org.activemq.management.StatsCapable; 032 import org.activemq.management.StatsImpl; 033 import org.activemq.message.ActiveMQDestination; 034 import org.activemq.message.ActiveMQMessage; 035 import org.activemq.selector.SelectorParser; 036 import org.apache.commons.logging.Log; 037 import org.apache.commons.logging.LogFactory; 038 039 /** 040 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages from a destination. A <CODE> 041 * MessageConsumer</CODE> object is created by passing a <CODE>Destination</CODE> object to a message-consumer 042 * creation method supplied by a session. 043 * <P> 044 * <CODE>MessageConsumer</CODE> is the parent interface for all message consumers. 045 * <P> 046 * A message consumer can be created with a message selector. A message selector allows the client to restrict the 047 * messages delivered to the message consumer to those that match the selector. 048 * <P> 049 * A client may either synchronously receive a message consumer's messages or have the consumer asynchronously deliver 050 * them as they arrive. 051 * <P> 052 * For synchronous receipt, a client can request the next message from a message consumer using one of its <CODE> 053 * receive</CODE> methods. There are several variations of <CODE>receive</CODE> that allow a client to poll or wait 054 * for the next message. 055 * <P> 056 * For asynchronous delivery, a client can register a <CODE>MessageListener</CODE> object with a message consumer. As 057 * messages arrive at the message consumer, it delivers them by calling the <CODE>MessageListener</CODE>'s<CODE> 058 * onMessage</CODE> method. 059 * <P> 060 * It is a client programming error for a <CODE>MessageListener</CODE> to throw an exception. 061 * 062 * @version $Revision: 1.1.1.1 $ 063 * @see javax.jms.MessageConsumer 064 * @see javax.jms.QueueReceiver 065 * @see javax.jms.TopicSubscriber 066 * @see javax.jms.Session 067 */ 068 public class ActiveMQMessageConsumer implements MessageConsumer, StatsCapable, Closeable { 069 private static final Log log = LogFactory.getLog(ActiveMQMessageConsumer.class); 070 protected ActiveMQSession session; 071 protected String consumerIdentifier; 072 protected MemoryBoundedQueue messageQueue; 073 protected String messageSelector; 074 private MessageListener messageListener; 075 protected String consumerName; 076 protected ActiveMQDestination destination; 077 private boolean closed; 078 protected int consumerNumber; 079 protected int prefetchNumber; 080 protected long startTime; 081 protected boolean noLocal; 082 protected boolean browser; 083 private Thread accessThread; 084 private Object messageListenerGuard; 085 private JMSConsumerStatsImpl stats; 086 087 private boolean running=true; 088 private LinkedList stoppedQueue=new LinkedList(); 089 /** 090 * Create a MessageConsumer 091 * 092 * @param theSession 093 * @param dest 094 * @param name 095 * @param selector 096 * @param cnum 097 * @param prefetch 098 * @param noLocalValue 099 * @param browserValue 100 * @throws JMSException 101 */ 102 protected ActiveMQMessageConsumer(ActiveMQSession theSession, ActiveMQDestination dest, String name, 103 String selector, int cnum, int prefetch, boolean noLocalValue, boolean browserValue) throws JMSException { 104 if (dest == null) { 105 throw new InvalidDestinationException("Do not understand a null destination"); 106 } 107 if (dest.isTemporary() && theSession.connection.isJ2EEcompliant() && !theSession.isInternalSession()) { 108 //validate that the destination comes from this Connection 109 String physicalName = dest.getPhysicalName(); 110 if (physicalName == null) { 111 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest); 112 } 113 String clientID = theSession.connection.getInitializedClientID(); 114 if (physicalName.indexOf(clientID) < 0) { 115 throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection"); 116 } 117 if (dest.isDeleted()) { 118 throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted"); 119 } 120 } 121 dest.incrementConsumerCounter(); 122 if (selector != null) { 123 selector = selector.trim(); 124 if (selector.length() > 0) { 125 // Validate that the selector 126 new SelectorParser().parse(selector); 127 } 128 } 129 this.session = theSession; 130 this.destination = dest; 131 this.consumerName = name; 132 this.messageSelector = selector; 133 134 this.consumerNumber = cnum; 135 this.prefetchNumber = prefetch; 136 this.noLocal = noLocalValue; 137 this.browser = browserValue; 138 this.consumerIdentifier = theSession.connection.getClientID() + "." + theSession.getSessionId() + "." + this.consumerNumber; 139 this.startTime = System.currentTimeMillis(); 140 this.messageListenerGuard = new Object(); 141 this.messageQueue = theSession.connection.getMemoryBoundedQueue(this.consumerIdentifier); 142 this.stats = new JMSConsumerStatsImpl(theSession.getSessionStats(), dest); 143 this.session.addConsumer(this); 144 } 145 146 /** 147 * @return the memory used by the internal queue for this MessageConsumer 148 */ 149 public long getLocalMemoryUsage() { 150 return this.messageQueue.getLocalMemoryUsedByThisQueue(); 151 } 152 153 /** 154 * @return the number of messages enqueued by this consumer awaiting dispatch 155 */ 156 public int size() { 157 return this.messageQueue.size(); 158 } 159 160 161 /** 162 * @return Stats for this MessageConsumer 163 */ 164 public StatsImpl getStats() { 165 return stats; 166 } 167 168 /** 169 * @return Stats for this MessageConsumer 170 */ 171 public JMSConsumerStatsImpl getConsumerStats() { 172 return stats; 173 } 174 175 /** 176 * @return pretty print of this consumer 177 */ 178 public String toString() { 179 return "MessageConsumer: " + consumerIdentifier + "[" + consumerNumber + "]"; 180 } 181 182 /** 183 * @return Returns the prefetchNumber. 184 */ 185 public int getPrefetchNumber() { 186 return prefetchNumber; 187 } 188 189 /** 190 * @param prefetchNumber The prefetchNumber to set. 191 */ 192 public void setPrefetchNumber(int prefetchNumber) { 193 this.prefetchNumber = prefetchNumber; 194 } 195 196 /** 197 * Gets this message consumer's message selector expression. 198 * 199 * @return this message consumer's message selector, or null if no message selector exists for the message consumer 200 * (that is, if the message selector was not set or was set to null or the empty string) 201 * @throws JMSException if the JMS provider fails to receive the next message due to some internal error. 202 */ 203 public String getMessageSelector() throws JMSException { 204 checkClosed(); 205 return this.messageSelector; 206 } 207 208 /** 209 * Gets the message consumer's <CODE>MessageListener</CODE>. 210 * 211 * @return the listener for the message consumer, or null if no listener is set 212 * @throws JMSException if the JMS provider fails to get the message listener due to some internal error. 213 * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener) 214 */ 215 public MessageListener getMessageListener() throws JMSException { 216 checkClosed(); 217 return this.messageListener; 218 } 219 220 /** 221 * Sets the message consumer's <CODE>MessageListener</CODE>. 222 * <P> 223 * Setting the message listener to null is the equivalent of unsetting the message listener for the message 224 * consumer. 225 * <P> 226 * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE> while messages are being consumed by an 227 * existing listener or the consumer is being used to consume messages synchronously is undefined. 228 * 229 * @param listener the listener to which the messages are to be delivered 230 * @throws JMSException if the JMS provider fails to receive the next message due to some internal error. 231 * @see javax.jms.MessageConsumer#getMessageListener() 232 */ 233 public void setMessageListener(MessageListener listener) throws JMSException { 234 checkClosed(); 235 synchronized (messageListenerGuard) { 236 this.messageListener = listener; 237 if (listener != null) { 238 session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_ASYNC); 239 //messages may already be enqueued 240 ActiveMQMessage msg = null; 241 try { 242 while ((msg = (ActiveMQMessage)messageQueue.dequeueNoWait()) != null) { 243 processMessage(msg); 244 } 245 } 246 catch (InterruptedException ex) { 247 JMSException jmsEx = new JMSException("Interrupted setting message listener"); 248 jmsEx.setLinkedException(ex); 249 throw jmsEx; 250 } 251 } 252 } 253 } 254 255 /** 256 * Receives the next message produced for this message consumer. 257 * <P> 258 * This call blocks indefinitely until a message is produced or until this message consumer is closed. 259 * <P> 260 * If this <CODE>receive</CODE> is done within a transaction, the consumer retains the message until the 261 * transaction commits. 262 * 263 * @return the next message produced for this message consumer, or null if this message consumer is concurrently 264 * closed 265 * @throws JMSException 266 */ 267 public Message receive() throws JMSException { 268 checkClosed(); 269 session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC); 270 try { 271 this.accessThread = Thread.currentThread(); 272 ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(); 273 this.accessThread = null; 274 if (message != null) { 275 boolean expired = message.isExpired(); 276 messageDelivered(message, true, expired); 277 if (!expired) { 278 message = message.shallowCopy(); 279 } 280 else { 281 message = (ActiveMQMessage) receiveNoWait(); //this will remove any other expired messages held in the queue 282 } 283 } 284 if( message!=null && log.isDebugEnabled() ) { 285 log.debug("Message received: "+message); 286 } 287 return message; 288 } 289 catch (InterruptedException ioe) { 290 return null; 291 } 292 } 293 294 /** 295 * Receives the next message that arrives within the specified timeout interval. 296 * <P> 297 * This call blocks until a message arrives, the timeout expires, or this message consumer is closed. A <CODE> 298 * timeout</CODE> of zero never expires, and the call blocks indefinitely. 299 * 300 * @param timeout the timeout value (in milliseconds) 301 * @return the next message produced for this message consumer, or null if the timeout expires or this message 302 * consumer is concurrently closed 303 * @throws JMSException 304 */ 305 public Message receive(long timeout) throws JMSException { 306 checkClosed(); 307 session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC); 308 try { 309 if (timeout == 0) { 310 return this.receive(); 311 } 312 this.accessThread = Thread.currentThread(); 313 ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(timeout); 314 this.accessThread = null; 315 if (message != null) { 316 boolean expired = message.isExpired(); 317 messageDelivered(message, true, expired); 318 if (!expired) { 319 message = message.shallowCopy(); 320 } 321 else { 322 message = (ActiveMQMessage) receiveNoWait(); //this will remove any other expired messages held in the queue 323 } 324 } 325 if( message!=null && log.isDebugEnabled() ) { 326 log.debug("Message received: "+message); 327 } 328 return message; 329 } 330 catch (InterruptedException ioe) { 331 return null; 332 } 333 } 334 335 /** 336 * Receives the next message if one is immediately available. 337 * 338 * @return the next message produced for this message consumer, or null if one is not available 339 * @throws JMSException if the JMS provider fails to receive the next message due to some internal error. 340 */ 341 public Message receiveNoWait() throws JMSException { 342 checkClosed(); 343 session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC); 344 try { 345 ActiveMQMessage message = null; 346 //iterate through an scrub delivered but expired messages 347 while ((message = (ActiveMQMessage) messageQueue.dequeueNoWait()) != null) { 348 boolean expired = message.isExpired(); 349 messageDelivered(message, true, expired); 350 if (!expired) { 351 if( message!=null && log.isDebugEnabled() ) { 352 log.debug("Message received: "+message); 353 } 354 return message.shallowCopy(); 355 } 356 } 357 } 358 catch (InterruptedException ioe) { 359 throw new JMSException("Queue is interrupted: " + ioe.getMessage()); 360 } 361 return null; 362 } 363 364 /** 365 * Closes the message consumer. 366 * <P> 367 * Since a provider may allocate some resources on behalf of a <CODE>MessageConsumer</CODE> outside the Java 368 * virtual machine, clients should close them when they are not needed. Relying on garbage collection to eventually 369 * reclaim these resources may not be timely enough. 370 * <P> 371 * This call blocks until a <CODE>receive</CODE> or message listener in progress has completed. A blocked message 372 * consumer <CODE>receive</CODE> call returns null when this message consumer is closed. 373 * 374 * @throws JMSException if the JMS provider fails to close the consumer due to some internal error. 375 */ 376 public void close() throws JMSException { 377 try { 378 this.accessThread.interrupt(); 379 } 380 catch (NullPointerException npe) { 381 } 382 catch (SecurityException se) { 383 } 384 if (destination != null) { 385 destination.decrementConsumerCounter(); 386 } 387 388 this.session.removeConsumer(this); 389 messageQueue.close(); 390 closed = true; 391 } 392 393 /** 394 * @return true if this is a durable topic subscriber 395 */ 396 public boolean isDurableSubscriber() { 397 return this instanceof ActiveMQTopicSubscriber && consumerName != null && consumerName.length() > 0; 398 } 399 400 /** 401 * @return true if this is a Transient Topic subscriber 402 */ 403 public boolean isTransientSubscriber(){ 404 return this.destination != null && destination.isTopic() && (consumerName == null || consumerName.length() ==0); 405 } 406 407 /** 408 * @throws IllegalStateException 409 */ 410 protected void checkClosed() throws IllegalStateException { 411 if (closed) { 412 throw new IllegalStateException("The Consumer is closed"); 413 } 414 } 415 416 /** 417 * Process a Message - passing either to the queue or message listener 418 * 419 * @param message 420 */ 421 synchronized protected void processMessage(ActiveMQMessage message) { 422 if( !running ) { 423 stoppedQueue.addLast(message); 424 return; 425 } 426 message.setConsumerIdentifer(this.consumerIdentifier); 427 MessageListener listener = null; 428 synchronized (messageListenerGuard) { 429 listener = this.messageListener; 430 } 431 boolean transacted = session.isTransacted(); 432 try { 433 if (!closed) { 434 if (message.getJMSActiveMQDestination() == null) { 435 message.setJMSDestination(getDestination()); 436 } 437 if (listener != null) { 438 beforeMessageDelivered(message); 439 boolean expired = message.isExpired(); 440 if (transacted) { 441 afterMessageDelivered(message, true, expired, true); 442 } 443 if (!expired) { 444 if( log.isDebugEnabled() ) { 445 log.debug("Message delivered to message listener: "+message); 446 } 447 listener.onMessage(message.shallowCopy()); 448 } 449 if (!transacted) { 450 afterMessageDelivered(message, true, expired, true); 451 } 452 } 453 else { 454 this.messageQueue.enqueue(message); 455 } 456 } 457 else { 458 messageDelivered(message, false, false); 459 } 460 } 461 catch (Throwable e) { 462 log.warn("could not process message: " + message + ". Reason: " + e, e); 463 messageDelivered(message, false, false); 464 } 465 } 466 467 /** 468 * @return Returns the consumerId. 469 */ 470 protected String getConsumerIdentifier() { 471 return consumerIdentifier; 472 } 473 474 /** 475 * @return the consumer name - used for durable consumers 476 */ 477 protected String getConsumerName() { 478 return this.consumerName; 479 } 480 481 /** 482 * Set the name of the Consumer - used for durable subscribers 483 * 484 * @param value 485 */ 486 protected void setConsumerName(String value) { 487 this.consumerName = value; 488 } 489 490 /** 491 * @return the locally unique Consumer Number 492 */ 493 protected int getConsumerNumber() { 494 return this.consumerNumber; 495 } 496 497 /** 498 * Set the locally unique consumer number 499 * 500 * @param value 501 */ 502 protected void setConsumerNumber(int value) { 503 this.consumerNumber = value; 504 } 505 506 /** 507 * @return true if this consumer does not accept locally produced messages 508 */ 509 protected boolean isNoLocal() { 510 return this.noLocal; 511 } 512 513 /** 514 * Retrive is a browser 515 * 516 * @return true if a browser 517 */ 518 protected boolean isBrowser() { 519 return this.browser; 520 } 521 522 /** 523 * Set true if only a Browser 524 * 525 * @param value 526 * @see ActiveMQQueueBrowser 527 */ 528 protected void setBrowser(boolean value) { 529 this.browser = value; 530 } 531 532 /** 533 * @return ActiveMQDestination 534 */ 535 protected ActiveMQDestination getDestination() { 536 return this.destination; 537 } 538 539 /** 540 * @return the startTime 541 */ 542 protected long getStartTime() { 543 return startTime; 544 } 545 546 protected void clearMessagesInProgress() { 547 messageQueue.clear(); 548 stoppedQueue.clear(); 549 } 550 551 private void messageDelivered(ActiveMQMessage message, boolean messageRead, boolean messageExpired) { 552 afterMessageDelivered(message, messageRead, messageExpired, false); 553 } 554 555 private void beforeMessageDelivered(ActiveMQMessage message) { 556 if (message == null) { 557 return; 558 } 559 boolean topic = destination != null && destination.isTopic(); 560 message.setTransientConsumed((!isDurableSubscriber() || !message.isPersistent()) && topic); 561 this.session.beforeMessageDelivered(message); 562 } 563 564 private void afterMessageDelivered(ActiveMQMessage message, boolean messageRead, boolean messageExpired, boolean beforeCalled) { 565 if (message == null) { 566 return; 567 } 568 569 boolean consumed = browser ? false : messageRead; 570 ActiveMQDestination destination = message.getJMSActiveMQDestination(); 571 boolean topic = destination != null && destination.isTopic(); 572 message.setTransientConsumed((!isDurableSubscriber() || !message.isPersistent()) && topic); 573 this.session.afterMessageDelivered((isDurableSubscriber() || this.destination.isQueue()), message, consumed, messageExpired, beforeCalled); 574 if (messageRead) { 575 stats.onMessage(message); 576 } 577 578 } 579 580 synchronized public void start() { 581 running=true; 582 while( !stoppedQueue.isEmpty() ) { 583 ActiveMQMessage m = (ActiveMQMessage)stoppedQueue.removeFirst(); 584 processMessage(m); 585 } 586 } 587 588 synchronized public void stop() { 589 running=false; 590 } 591 592 }