001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq;
020    
021    import java.util.Iterator;
022    
023    import javax.jms.JMSException;
024    
025    import org.activemq.io.util.MemoryBoundedQueue;
026    import org.activemq.message.ActiveMQMessage;
027    
028    /**
029     * A utility class used by the Session for dispatching messages asynchronously to consumers
030     *
031     * @version $Revision: 1.1.1.1 $
032     * @see javax.jms.Session
033     */
034    public class ActiveMQSessionExecutor implements Runnable {
035        private ActiveMQSession session;
036        private MemoryBoundedQueue messageQueue;
037        private boolean closed;
038        private Thread runner;
039        private boolean dispatchedBySessionPool;
040        private boolean optimizedMessageDispatch;
041    
042        ActiveMQSessionExecutor(ActiveMQSession session, MemoryBoundedQueue queue) {
043            this.session = session;
044            this.messageQueue = queue;
045        }
046    
047        void setDispatchedBySessionPool(boolean value) {
048            dispatchedBySessionPool = value;
049        }
050        
051        /**
052         * @return Returns the optimizedMessageDispatch.
053         */
054        boolean isOptimizedMessageDispatch() {
055            return optimizedMessageDispatch;
056        }
057        /**
058         * @param optimizedMessageDispatch The optimizedMessageDispatch to set.
059         */
060        void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
061            this.optimizedMessageDispatch = optimizedMessageDispatch;
062        }
063    
064        void execute(ActiveMQMessage message) {
065            if (optimizedMessageDispatch && !dispatchedBySessionPool){
066                dispatch(message);
067            }else {
068                messageQueue.enqueue(message);
069            }
070           
071        }
072    
073        void executeFirst(ActiveMQMessage message) {
074            messageQueue.enqueueFirstNoBlock(message);
075        }
076    
077        boolean hasUncomsumedMessages() {
078            return !messageQueue.isEmpty();
079        }
080    
081        /**
082         * implementation of Runnable
083         */
084        public void run() {
085            while (!closed && !dispatchedBySessionPool) {
086                ActiveMQMessage message = null;
087                try {
088                    message = (ActiveMQMessage) messageQueue.dequeue(100);
089                }
090                catch (InterruptedException ie) {
091                }
092                if (!closed) {
093                    if (message != null) {
094                        if (!dispatchedBySessionPool) {
095                            dispatch(message);
096                        }
097                        else {
098                            messageQueue.enqueueFirstNoBlock(message);
099                        }
100                    }
101                }
102            }
103        }
104        
105        void dispatch(ActiveMQMessage message){
106            for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
107                ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
108                if (message.isConsumerTarget(consumer.getConsumerNumber())) {
109                    try {
110                        consumer.processMessage(message.shallowCopy());
111                    }
112                    catch (JMSException e) {
113                        this.session.connection.handleAsyncException(e);
114                    }
115                }
116            }
117        }
118    
119        synchronized void start() {
120            messageQueue.start();
121            if (runner == null && (!dispatchedBySessionPool || optimizedMessageDispatch)) {
122                runner = new Thread(this, "JmsSessionDispatcher: " + session.getSessionId());
123                runner.setPriority(Thread.MAX_PRIORITY);
124                //runner.setDaemon(true);
125                runner.start();
126            }
127        }
128    
129        synchronized void stop() {
130            messageQueue.stop();
131        }
132    
133        synchronized void close() {
134            closed = true;
135            messageQueue.close();
136        }
137    
138        void clear() {
139            messageQueue.clear();
140        }
141    
142        ActiveMQMessage dequeueNoWait() {
143            try {
144                return (ActiveMQMessage) messageQueue.dequeueNoWait();
145            }
146            catch (InterruptedException ie) {
147                return null;
148            }
149        }
150        
151        protected void clearMessagesInProgress(){
152            messageQueue.clear();
153        }
154        
155        
156    }