001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq;
019    
020    import java.util.LinkedList;
021    
022    import javax.jms.IllegalStateException;
023    import javax.jms.InvalidDestinationException;
024    import javax.jms.JMSException;
025    import javax.jms.Message;
026    import javax.jms.MessageConsumer;
027    import javax.jms.MessageListener;
028    
029    import org.activemq.io.util.MemoryBoundedQueue;
030    import org.activemq.management.JMSConsumerStatsImpl;
031    import org.activemq.management.StatsCapable;
032    import org.activemq.management.StatsImpl;
033    import org.activemq.message.ActiveMQDestination;
034    import org.activemq.message.ActiveMQMessage;
035    import org.activemq.selector.SelectorParser;
036    import org.apache.commons.logging.Log;
037    import org.apache.commons.logging.LogFactory;
038    
039    /**
040     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages from a destination. A <CODE>
041     * MessageConsumer</CODE> object is created by passing a <CODE>Destination</CODE> object to a message-consumer
042     * creation method supplied by a session.
043     * <P>
044     * <CODE>MessageConsumer</CODE> is the parent interface for all message consumers.
045     * <P>
046     * A message consumer can be created with a message selector. A message selector allows the client to restrict the
047     * messages delivered to the message consumer to those that match the selector.
048     * <P>
049     * A client may either synchronously receive a message consumer's messages or have the consumer asynchronously deliver
050     * them as they arrive.
051     * <P>
052     * For synchronous receipt, a client can request the next message from a message consumer using one of its <CODE>
053     * receive</CODE> methods. There are several variations of <CODE>receive</CODE> that allow a client to poll or wait
054     * for the next message.
055     * <P>
056     * For asynchronous delivery, a client can register a <CODE>MessageListener</CODE> object with a message consumer. As
057     * messages arrive at the message consumer, it delivers them by calling the <CODE>MessageListener</CODE>'s<CODE>
058     * onMessage</CODE> method.
059     * <P>
060     * It is a client programming error for a <CODE>MessageListener</CODE> to throw an exception.
061     *
062     * @version $Revision: 1.1.1.1 $
063     * @see javax.jms.MessageConsumer
064     * @see javax.jms.QueueReceiver
065     * @see javax.jms.TopicSubscriber
066     * @see javax.jms.Session
067     */
068    public class ActiveMQMessageConsumer implements MessageConsumer, StatsCapable, Closeable {
069        private static final Log log = LogFactory.getLog(ActiveMQMessageConsumer.class);
070        protected ActiveMQSession session;
071        protected String consumerIdentifier;
072        protected MemoryBoundedQueue messageQueue;
073        protected String messageSelector;
074        private MessageListener messageListener;
075        protected String consumerName;
076        protected ActiveMQDestination destination;
077        private boolean closed;
078        protected int consumerNumber;
079        protected int prefetchNumber;
080        protected long startTime;
081        protected boolean noLocal;
082        protected boolean browser;
083        private Thread accessThread;
084        private Object messageListenerGuard;
085        private JMSConsumerStatsImpl stats;
086        
087        private boolean running=true;
088        private LinkedList stoppedQueue=new LinkedList(); 
089        /**
090         * Create a MessageConsumer
091         *
092         * @param theSession
093         * @param dest
094         * @param name
095         * @param selector
096         * @param cnum
097         * @param prefetch
098         * @param noLocalValue
099         * @param browserValue
100         * @throws JMSException
101         */
102        protected ActiveMQMessageConsumer(ActiveMQSession theSession, ActiveMQDestination dest, String name,
103                                          String selector, int cnum, int prefetch, boolean noLocalValue, boolean browserValue) throws JMSException {
104            if (dest == null) {
105                throw new InvalidDestinationException("Do not understand a null destination");
106            }
107            if (dest.isTemporary() && theSession.connection.isJ2EEcompliant() && !theSession.isInternalSession()) {
108                //validate that the destination comes from this Connection
109                String physicalName = dest.getPhysicalName();
110                if (physicalName == null) {
111                    throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
112                }
113                String clientID = theSession.connection.getInitializedClientID();
114                if (physicalName.indexOf(clientID) < 0) {
115                    throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
116                }
117                if (dest.isDeleted()) {
118                    throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
119                }
120            }
121            dest.incrementConsumerCounter();
122            if (selector != null) {
123                selector = selector.trim();
124                if (selector.length() > 0) {
125                    // Validate that the selector
126                    new SelectorParser().parse(selector);
127                }
128            }
129            this.session = theSession;
130            this.destination = dest;
131            this.consumerName = name;
132            this.messageSelector = selector;
133    
134            this.consumerNumber = cnum;
135            this.prefetchNumber = prefetch;
136            this.noLocal = noLocalValue;
137            this.browser = browserValue;
138            this.consumerIdentifier = theSession.connection.getClientID() + "." + theSession.getSessionId() + "." + this.consumerNumber;
139            this.startTime = System.currentTimeMillis();
140            this.messageListenerGuard = new Object();
141            this.messageQueue = theSession.connection.getMemoryBoundedQueue(this.consumerIdentifier);
142            this.stats = new JMSConsumerStatsImpl(theSession.getSessionStats(), dest);
143            this.session.addConsumer(this);
144        }
145    
146        /**
147         * @return the memory used by the internal queue for this MessageConsumer
148         */
149        public long getLocalMemoryUsage() {
150            return this.messageQueue.getLocalMemoryUsedByThisQueue();
151        }
152    
153        /**
154         * @return the number of messages enqueued by this consumer awaiting dispatch
155         */
156        public int size() {
157            return this.messageQueue.size();
158        }
159    
160    
161        /**
162         * @return Stats for this MessageConsumer
163         */
164        public StatsImpl getStats() {
165            return stats;
166        }
167    
168        /**
169         * @return Stats for this MessageConsumer
170         */
171        public JMSConsumerStatsImpl getConsumerStats() {
172            return stats;
173        }
174    
175        /**
176         * @return pretty print of this consumer
177         */
178        public String toString() {
179            return "MessageConsumer: " + consumerIdentifier + "[" + consumerNumber + "]";
180        }
181    
182        /**
183         * @return Returns the prefetchNumber.
184         */
185        public int getPrefetchNumber() {
186            return prefetchNumber;
187        }
188    
189        /**
190         * @param prefetchNumber The prefetchNumber to set.
191         */
192        public void setPrefetchNumber(int prefetchNumber) {
193            this.prefetchNumber = prefetchNumber;
194        }
195    
196        /**
197         * Gets this message consumer's message selector expression.
198         *
199         * @return this message consumer's message selector, or null if no message selector exists for the message consumer
200         *         (that is, if the message selector was not set or was set to null or the empty string)
201         * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
202         */
203        public String getMessageSelector() throws JMSException {
204            checkClosed();
205            return this.messageSelector;
206        }
207    
208        /**
209         * Gets the message consumer's <CODE>MessageListener</CODE>.
210         *
211         * @return the listener for the message consumer, or null if no listener is set
212         * @throws JMSException if the JMS provider fails to get the message listener due to some internal error.
213         * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
214         */
215        public MessageListener getMessageListener() throws JMSException {
216            checkClosed();
217            return this.messageListener;
218        }
219    
220        /**
221         * Sets the message consumer's <CODE>MessageListener</CODE>.
222         * <P>
223         * Setting the message listener to null is the equivalent of unsetting the message listener for the message
224         * consumer.
225         * <P>
226         * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE> while messages are being consumed by an
227         * existing listener or the consumer is being used to consume messages synchronously is undefined.
228         *
229         * @param listener the listener to which the messages are to be delivered
230         * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
231         * @see javax.jms.MessageConsumer#getMessageListener()
232         */
233        public void setMessageListener(MessageListener listener) throws JMSException {
234            checkClosed();
235            synchronized (messageListenerGuard) {
236                this.messageListener = listener;
237                if (listener != null) {
238                    session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_ASYNC);
239                    //messages may already be enqueued
240                    ActiveMQMessage msg = null;
241                    try {
242                        while ((msg = (ActiveMQMessage)messageQueue.dequeueNoWait()) != null) {
243                            processMessage(msg);
244                        }
245                    }
246                    catch (InterruptedException ex) {
247                        JMSException jmsEx = new JMSException("Interrupted setting message listener");
248                        jmsEx.setLinkedException(ex);
249                        throw jmsEx;
250                    }
251                }
252            }
253        }
254    
255        /**
256         * Receives the next message produced for this message consumer.
257         * <P>
258         * This call blocks indefinitely until a message is produced or until this message consumer is closed.
259         * <P>
260         * If this <CODE>receive</CODE> is done within a transaction, the consumer retains the message until the
261         * transaction commits.
262         *
263         * @return the next message produced for this message consumer, or null if this message consumer is concurrently
264         *         closed
265         * @throws JMSException
266         */
267        public Message receive() throws JMSException {
268            checkClosed();
269            session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC);
270            try {
271                this.accessThread = Thread.currentThread();
272                ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue();
273                this.accessThread = null;
274                if (message != null) {
275                    boolean expired = message.isExpired();
276                    messageDelivered(message, true, expired);
277                    if (!expired) {
278                        message = message.shallowCopy();
279                    }
280                    else {
281                        message = (ActiveMQMessage) receiveNoWait(); //this will remove any other expired messages held in the queue
282                    }
283                }
284                if( message!=null && log.isDebugEnabled() ) {
285                    log.debug("Message received: "+message);
286                }            
287                return message;
288            }
289            catch (InterruptedException ioe) {
290                return null;
291            }
292        }
293    
294        /**
295         * Receives the next message that arrives within the specified timeout interval.
296         * <P>
297         * This call blocks until a message arrives, the timeout expires, or this message consumer is closed. A <CODE>
298         * timeout</CODE> of zero never expires, and the call blocks indefinitely.
299         *
300         * @param timeout the timeout value (in milliseconds)
301         * @return the next message produced for this message consumer, or null if the timeout expires or this message
302         *         consumer is concurrently closed
303         * @throws JMSException
304         */
305        public Message receive(long timeout) throws JMSException {
306            checkClosed();
307            session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC);
308            try {
309                if (timeout == 0) {
310                    return this.receive();
311                }
312                this.accessThread = Thread.currentThread();
313                ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(timeout);
314                this.accessThread = null;
315                if (message != null) {
316                    boolean expired = message.isExpired();
317                    messageDelivered(message, true, expired);
318                    if (!expired) {
319                        message = message.shallowCopy();
320                    }
321                    else {
322                        message = (ActiveMQMessage) receiveNoWait(); //this will remove any other expired messages held in the queue
323                    }
324                }
325                if( message!=null && log.isDebugEnabled() ) {
326                    log.debug("Message received: "+message);
327                }            
328                return message;
329            }
330            catch (InterruptedException ioe) {
331                return null;
332            }
333        }
334    
335        /**
336         * Receives the next message if one is immediately available.
337         *
338         * @return the next message produced for this message consumer, or null if one is not available
339         * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
340         */
341        public Message receiveNoWait() throws JMSException {
342            checkClosed();
343            session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC);
344            try {
345                ActiveMQMessage message = null;
346                //iterate through an scrub delivered but expired messages
347                while ((message = (ActiveMQMessage) messageQueue.dequeueNoWait()) != null) {
348                    boolean expired = message.isExpired();
349                    messageDelivered(message, true, expired);
350                    if (!expired) {
351                        if( message!=null && log.isDebugEnabled() ) {
352                            log.debug("Message received: "+message);
353                        }            
354                        return message.shallowCopy();
355                    }
356                }
357            }
358            catch (InterruptedException ioe) {
359                throw new JMSException("Queue is interrupted: " + ioe.getMessage());
360            }
361            return null;
362        }
363    
364        /**
365         * Closes the message consumer.
366         * <P>
367         * Since a provider may allocate some resources on behalf of a <CODE>MessageConsumer</CODE> outside the Java
368         * virtual machine, clients should close them when they are not needed. Relying on garbage collection to eventually
369         * reclaim these resources may not be timely enough.
370         * <P>
371         * This call blocks until a <CODE>receive</CODE> or message listener in progress has completed. A blocked message
372         * consumer <CODE>receive</CODE> call returns null when this message consumer is closed.
373         *
374         * @throws JMSException if the JMS provider fails to close the consumer due to some internal error.
375         */
376        public void close() throws JMSException {
377            try {
378                this.accessThread.interrupt();
379            }
380            catch (NullPointerException npe) {
381            }
382            catch (SecurityException se) {
383            }
384            if (destination != null) {
385                destination.decrementConsumerCounter();
386            }
387    
388            this.session.removeConsumer(this);
389            messageQueue.close();
390            closed = true;
391        }
392    
393        /**
394         * @return true if this is a durable topic subscriber
395         */
396        public boolean isDurableSubscriber() {
397            return this instanceof ActiveMQTopicSubscriber && consumerName != null && consumerName.length() > 0;
398        }
399        
400        /**
401         * @return true if this is a Transient Topic subscriber
402         */
403        public boolean isTransientSubscriber(){
404            return this.destination != null && destination.isTopic() && (consumerName == null || consumerName.length() ==0);
405        }
406    
407        /**
408         * @throws IllegalStateException
409         */
410        protected void checkClosed() throws IllegalStateException {
411            if (closed) {
412                throw new IllegalStateException("The Consumer is closed");
413            }
414        }
415    
416        /**
417         * Process a Message - passing either to the queue or message listener
418         *
419         * @param message
420         */
421        synchronized protected void processMessage(ActiveMQMessage message) {
422            if( !running ) {
423                stoppedQueue.addLast(message);
424                return;
425            }
426            message.setConsumerIdentifer(this.consumerIdentifier);
427            MessageListener listener = null;
428            synchronized (messageListenerGuard) {
429                listener = this.messageListener;
430            }
431            boolean transacted = session.isTransacted();
432            try {
433                if (!closed) {
434                    if (message.getJMSActiveMQDestination() == null) {
435                        message.setJMSDestination(getDestination());
436                    }
437                    if (listener != null) {
438                        beforeMessageDelivered(message);
439                        boolean expired = message.isExpired();
440                        if (transacted) {
441                            afterMessageDelivered(message, true, expired, true);
442                        }
443                        if (!expired) {
444                            if( log.isDebugEnabled() ) {
445                                log.debug("Message delivered to message listener: "+message);
446                            }
447                            listener.onMessage(message.shallowCopy());
448                        }
449                        if (!transacted) {
450                            afterMessageDelivered(message, true, expired, true);
451                        }
452                    }
453                    else {
454                        this.messageQueue.enqueue(message);
455                    }
456                }
457                else {
458                    messageDelivered(message, false, false);
459                }
460            }
461            catch (Throwable e) {
462                log.warn("could not process message: " + message + ". Reason: " + e, e);
463                messageDelivered(message, false, false);
464            }
465        }
466    
467        /**
468         * @return Returns the consumerId.
469         */
470        protected String getConsumerIdentifier() {
471            return consumerIdentifier;
472        }
473    
474        /**
475         * @return the consumer name - used for durable consumers
476         */
477        protected String getConsumerName() {
478            return this.consumerName;
479        }
480    
481        /**
482         * Set the name of the Consumer - used for durable subscribers
483         *
484         * @param value
485         */
486        protected void setConsumerName(String value) {
487            this.consumerName = value;
488        }
489    
490        /**
491         * @return the locally unique Consumer Number
492         */
493        protected int getConsumerNumber() {
494            return this.consumerNumber;
495        }
496    
497        /**
498         * Set the locally unique consumer number
499         *
500         * @param value
501         */
502        protected void setConsumerNumber(int value) {
503            this.consumerNumber = value;
504        }
505    
506        /**
507         * @return true if this consumer does not accept locally produced messages
508         */
509        protected boolean isNoLocal() {
510            return this.noLocal;
511        }
512    
513        /**
514         * Retrive is a browser
515         *
516         * @return true if a browser
517         */
518        protected boolean isBrowser() {
519            return this.browser;
520        }
521    
522        /**
523         * Set true if only a Browser
524         *
525         * @param value
526         * @see ActiveMQQueueBrowser
527         */
528        protected void setBrowser(boolean value) {
529            this.browser = value;
530        }
531    
532        /**
533         * @return ActiveMQDestination
534         */
535        protected ActiveMQDestination getDestination() {
536            return this.destination;
537        }
538    
539        /**
540         * @return the startTime
541         */
542        protected long getStartTime() {
543            return startTime;
544        }
545    
546        protected void clearMessagesInProgress() {
547            messageQueue.clear();
548            stoppedQueue.clear();
549        }
550    
551        private void messageDelivered(ActiveMQMessage message, boolean messageRead, boolean messageExpired) {
552            afterMessageDelivered(message, messageRead, messageExpired, false);
553        }
554    
555        private void beforeMessageDelivered(ActiveMQMessage message) {
556            if (message == null) {
557                return;
558            }
559            boolean topic = destination != null && destination.isTopic();
560            message.setTransientConsumed((!isDurableSubscriber() || !message.isPersistent()) && topic);
561            this.session.beforeMessageDelivered(message);
562        }
563    
564        private void afterMessageDelivered(ActiveMQMessage message, boolean messageRead, boolean messageExpired, boolean beforeCalled) {
565            if (message == null) {
566                return;
567            }
568    
569            boolean consumed = browser ? false : messageRead;
570            ActiveMQDestination destination = message.getJMSActiveMQDestination();
571            boolean topic = destination != null && destination.isTopic();
572            message.setTransientConsumed((!isDurableSubscriber() || !message.isPersistent()) && topic);
573            this.session.afterMessageDelivered((isDurableSubscriber() || this.destination.isQueue()), message, consumed, messageExpired, beforeCalled);
574            if (messageRead) {
575                stats.onMessage(message);
576            }
577    
578        }
579    
580        synchronized public void start() {
581            running=true;
582            while( !stoppedQueue.isEmpty() ) {
583                ActiveMQMessage m = (ActiveMQMessage)stoppedQueue.removeFirst();
584                processMessage(m);
585            }
586        }
587    
588        synchronized public void stop() {
589            running=false;
590        }
591    
592    }