001    /**
002     *
003     * Copyright 2004 Protique Ltd
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     *
017     **/
018    
019    package org.activemq;
020    import java.io.IOException;
021    import java.io.Serializable;
022    import java.util.Iterator;
023    import java.util.LinkedList;
024    import java.util.ListIterator;
025    import java.util.Map;
026    
027    import javax.jms.BytesMessage;
028    import javax.jms.DeliveryMode;
029    import javax.jms.Destination;
030    import javax.jms.IllegalStateException;
031    import javax.jms.InvalidDestinationException;
032    import javax.jms.InvalidSelectorException;
033    import javax.jms.JMSException;
034    import javax.jms.MapMessage;
035    import javax.jms.Message;
036    import javax.jms.MessageConsumer;
037    import javax.jms.MessageListener;
038    import javax.jms.MessageProducer;
039    import javax.jms.ObjectMessage;
040    import javax.jms.Queue;
041    import javax.jms.QueueBrowser;
042    import javax.jms.QueueReceiver;
043    import javax.jms.QueueSender;
044    import javax.jms.QueueSession;
045    import javax.jms.Session;
046    import javax.jms.StreamMessage;
047    import javax.jms.TemporaryQueue;
048    import javax.jms.TemporaryTopic;
049    import javax.jms.TextMessage;
050    import javax.jms.Topic;
051    import javax.jms.TopicPublisher;
052    import javax.jms.TopicSession;
053    import javax.jms.TopicSubscriber;
054    import javax.jms.TransactionRolledBackException;
055    
056    import org.activemq.io.util.ByteArray;
057    import org.activemq.io.util.ByteArrayCompression;
058    import org.activemq.io.util.ByteArrayFragmentation;
059    import org.activemq.management.JMSSessionStatsImpl;
060    import org.activemq.management.StatsCapable;
061    import org.activemq.management.StatsImpl;
062    import org.activemq.message.ActiveMQBytesMessage;
063    import org.activemq.message.ActiveMQDestination;
064    import org.activemq.message.ActiveMQMapMessage;
065    import org.activemq.message.ActiveMQMessage;
066    import org.activemq.message.ActiveMQObjectMessage;
067    import org.activemq.message.ActiveMQQueue;
068    import org.activemq.message.ActiveMQStreamMessage;
069    import org.activemq.message.ActiveMQTemporaryQueue;
070    import org.activemq.message.ActiveMQTemporaryTopic;
071    import org.activemq.message.ActiveMQTextMessage;
072    import org.activemq.message.ActiveMQTopic;
073    import org.activemq.message.ConsumerInfo;
074    import org.activemq.message.DurableUnsubscribe;
075    import org.activemq.message.MessageAck;
076    import org.activemq.message.MessageAcknowledge;
077    import org.activemq.message.ProducerInfo;
078    import org.activemq.service.impl.DefaultQueueList;
079    import org.activemq.util.IdGenerator;
080    import org.apache.commons.logging.Log;
081    import org.apache.commons.logging.LogFactory;
082    
083    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
084    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
085    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
086    
087    /**
088     * <P>
089     * A <CODE>Session</CODE> object is a single-threaded context for producing and consuming messages. Although it may
090     * allocate provider resources outside the Java virtual machine (JVM), it is considered a lightweight JMS object.
091     * <P>
092     * A session serves several purposes:
093     * <UL>
094     * <LI>It is a factory for its message producers and consumers.
095     * <LI>It supplies provider-optimized message factories.
096     * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and <CODE>TemporaryQueues</CODE>.
097     * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> objects for those clients that need to
098     * dynamically manipulate provider-specific destination names.
099     * <LI>It supports a single series of transactions that combine work spanning its producers and consumers into atomic
100     * units.
101     * <LI>It defines a serial order for the messages it consumes and the messages it produces.
102     * <LI>It retains messages it consumes until they have been acknowledged.
103     * <LI>It serializes execution of message listeners registered with its message consumers.
104     * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
105     * </UL>
106     * <P>
107     * A session can create and service multiple message producers and consumers.
108     * <P>
109     * One typical use is to have a thread block on a synchronous <CODE>MessageConsumer</CODE> until a message arrives.
110     * The thread may then use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
111     * <P>
112     * If a client desires to have one thread produce messages while others consume them, the client should use a separate
113     * session for its producing thread.
114     * <P>
115     * Once a connection has been started, any session with one or more registered message listeners is dedicated to the
116     * thread of control that delivers messages to it. It is erroneous for client code to use this session or any of its
117     * constituent objects from another thread of control. The only exception to this rule is the use of the session or
118     * connection <CODE>close</CODE> method.
119     * <P>
120     * It should be easy for most clients to partition their work naturally into sessions. This model allows clients to
121     * start simply and incrementally add message processing complexity as their need for concurrency grows.
122     * <P>
123     * The <CODE>close</CODE> method is the only session method that can be called while some other session method is
124     * being executed in another thread.
125     * <P>
126     * A session may be specified as transacted. Each transacted session supports a single series of transactions. Each
127     * transaction groups a set of message sends and a set of message receives into an atomic unit of work. In effect,
128     * transactions organize a session's input message stream and output message stream into series of atomic units. When a
129     * transaction commits, its atomic unit of input is acknowledged and its associated atomic unit of output is sent. If a
130     * transaction rollback is done, the transaction's sent messages are destroyed and the session's input is automatically
131     * recovered.
132     * <P>
133     * The content of a transaction's input and output units is simply those messages that have been produced and consumed
134     * within the session's current transaction.
135     * <P>
136     * A transaction is completed using either its session's <CODE>commit</CODE> method or its session's <CODE>rollback
137     * </CODE> method. The completion of a session's current transaction automatically begins the next. The result is that a
138     * transacted session always has a current transaction within which its work is done.
139     * <P>
140     * The Java Transaction Service (JTS) or some other transaction monitor may be used to combine a session's transaction
141     * with transactions on other resources (databases, other JMS sessions, etc.). Since Java distributed transactions are
142     * controlled via the Java Transaction API (JTA), use of the session's <CODE>commit</CODE> and <CODE>rollback</CODE>
143     * methods in this context is prohibited.
144     * <P>
145     * The JMS API does not require support for JTA; however, it does define how a provider supplies this support.
146     * <P>
147     * Although it is also possible for a JMS client to handle distributed transactions directly, it is unlikely that many
148     * JMS clients will do this. Support for JTA in the JMS API is targeted at systems vendors who will be integrating the
149     * JMS API into their application server products.
150     * 
151     * @version $Revision: 1.1.1.1 $
152     * @see javax.jms.Session
153     * @see javax.jms.QueueSession
154     * @see javax.jms.TopicSession
155     * @see javax.jms.XASession
156     */
157    public class ActiveMQSession
158            implements
159                Session,
160                QueueSession,
161                TopicSession,
162                ActiveMQMessageDispatcher,
163                MessageAcknowledge,
164                StatsCapable {
165        
166        public static interface DeliveryListener {
167            public void beforeDelivery(ActiveMQSession session, Message msg);
168            public void afterDelivery(ActiveMQSession session, Message msg);
169        }
170        
171        protected static final int CONSUMER_DISPATCH_UNSET = 1;
172        protected static final int CONSUMER_DISPATCH_ASYNC = 2;
173        protected static final int CONSUMER_DISPATCH_SYNC = 3;
174        private static final Log log = LogFactory.getLog(ActiveMQSession.class);
175        protected ActiveMQConnection connection;
176        protected int acknowledgeMode;
177        protected CopyOnWriteArrayList consumers;
178        protected CopyOnWriteArrayList producers;
179        private IdGenerator temporaryDestinationGenerator;
180        private MessageListener messageListener;
181        protected boolean closed;
182        private SynchronizedBoolean started;
183        private short sessionId;
184        private long startTime;
185        private DefaultQueueList deliveredMessages;
186        private ActiveMQSessionExecutor messageExecutor;
187        private JMSSessionStatsImpl stats;
188        private int consumerDispatchState;
189        private ByteArrayCompression compression;
190        private ByteArrayFragmentation fragmentation;
191        private Map assemblies; //used for assembling message fragments
192        private TransactionContext transactionContext;
193        private boolean internalSession;
194        private DeliveryListener deliveryListener;
195        
196        /**
197         * Construct the Session
198         * 
199         * @param theConnection
200         * @param theAcknowledgeMode n.b if transacted - the acknowledgeMode == Session.SESSION_TRANSACTED
201         * @throws JMSException on internal error
202         */
203        protected ActiveMQSession(ActiveMQConnection theConnection, int theAcknowledgeMode) throws JMSException {
204            this(theConnection, theAcknowledgeMode,theConnection.isOptimizedMessageDispatch());
205        }
206    
207        /**
208         * Construct the Session
209         * 
210         * @param theConnection
211         * @param theAcknowledgeMode n.b if transacted - the acknowledgeMode == Session.SESSION_TRANSACTED
212         * @param optimizedDispatch
213         * @throws JMSException on internal error
214         */
215        protected ActiveMQSession(ActiveMQConnection theConnection, int theAcknowledgeMode,boolean optimizedDispatch) throws JMSException {
216            this.connection = theConnection;
217            this.acknowledgeMode = theAcknowledgeMode;
218            setTransactionContext(new TransactionContext(theConnection));
219            this.consumers = new CopyOnWriteArrayList();
220            this.producers = new CopyOnWriteArrayList();
221            this.temporaryDestinationGenerator = new IdGenerator();
222            this.started = new SynchronizedBoolean(false);
223            this.sessionId = connection.generateSessionId();
224            this.startTime = System.currentTimeMillis();
225            this.deliveredMessages = new DefaultQueueList();
226            this.messageExecutor = new ActiveMQSessionExecutor(this, connection.getMemoryBoundedQueue("Session("
227                    + sessionId + ")"));
228            this.messageExecutor.setOptimizedMessageDispatch(optimizedDispatch);
229            connection.addSession(this);
230            stats = new JMSSessionStatsImpl(producers, consumers);
231            this.consumerDispatchState = CONSUMER_DISPATCH_UNSET;
232            this.compression = new ByteArrayCompression();
233            this.compression.setCompressionLevel(theConnection.getMessageCompressionLevel());
234            this.compression.setCompressionStrategy(theConnection.getMessageCompressionStrategy());
235            this.compression.setCompressionLimit(theConnection.getMessageCompressionLimit());
236            
237            this.fragmentation = new ByteArrayFragmentation();
238            this.fragmentation.setFragmentationLimit(theConnection.getMessageFragmentationLimit());
239            this.assemblies = new ConcurrentHashMap();
240            this.internalSession = theConnection.isInternalConnection();
241        }
242    
243        public void setTransactionContext(TransactionContext transactionContext) {
244            if( this.transactionContext!=null ) {
245                this.transactionContext.removeSession(this);
246            }        
247            this.transactionContext = transactionContext;
248            this.transactionContext.addSession(this);
249        }
250        
251        public TransactionContext getTransactionContext() {
252            return transactionContext;
253        }
254    
255        public StatsImpl getStats() {
256            return stats;
257        }
258    
259        public JMSSessionStatsImpl getSessionStats() {
260            return stats;
261        }
262    
263        /**
264         * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> object is used to send a message
265         * containing a stream of uninterpreted bytes.
266         * 
267         * @return the an ActiveMQBytesMessage
268         * @throws JMSException if the JMS provider fails to create this message due to some internal error.
269         */
270        public BytesMessage createBytesMessage() throws JMSException {
271            checkClosed();
272            return new ActiveMQBytesMessage();
273        }
274    
275        /**
276         * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> object is used to send a self-defining
277         * set of name-value pairs, where names are <CODE>String</CODE> objects and values are primitive values in the
278         * Java programming language.
279         * 
280         * @return an ActiveMQMapMessage
281         * @throws JMSException if the JMS provider fails to create this message due to some internal error.
282         */
283        public MapMessage createMapMessage() throws JMSException {
284            checkClosed();
285            return new ActiveMQMapMessage();
286        }
287    
288        /**
289         * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> interface is the root interface of all JMS
290         * messages. A <CODE>Message</CODE> object holds all the standard message header information. It can be sent when
291         * a message containing only header information is sufficient.
292         * 
293         * @return an ActiveMQMessage
294         * @throws JMSException if the JMS provider fails to create this message due to some internal error.
295         */
296        public Message createMessage() throws JMSException {
297            checkClosed();
298            return new ActiveMQMessage();
299        }
300    
301        /**
302         * Creates an <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to send a message
303         * that contains a serializable Java object.
304         * 
305         * @return an ActiveMQObjectMessage
306         * @throws JMSException if the JMS provider fails to create this message due to some internal error.
307         */
308        public ObjectMessage createObjectMessage() throws JMSException {
309            checkClosed();
310            return new ActiveMQObjectMessage();
311        }
312    
313        /**
314         * Creates an initialized <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to
315         * send a message that contains a serializable Java object.
316         * 
317         * @param object the object to use to initialize this message
318         * @return an ActiveMQObjectMessage
319         * @throws JMSException if the JMS provider fails to create this message due to some internal error.
320         */
321        public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
322            checkClosed();
323            ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
324            msg.setObject(object);
325            return msg;
326        }
327    
328        /**
329         * Creates a <CODE>StreamMessage</CODE> object. A <CODE>StreamMessage</CODE> object is used to send a
330         * self-defining stream of primitive values in the Java programming language.
331         * 
332         * @return an ActiveMQStreamMessage
333         * @throws JMSException if the JMS provider fails to create this message due to some internal error.
334         */
335        public StreamMessage createStreamMessage() throws JMSException {
336            checkClosed();
337            return new ActiveMQStreamMessage();
338        }
339    
340        /**
341         * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a message
342         * containing a <CODE>String</CODE> object.
343         * 
344         * @return an ActiveMQTextMessage
345         * @throws JMSException if the JMS provider fails to create this message due to some internal error.
346         */
347        public TextMessage createTextMessage() throws JMSException {
348            checkClosed();
349            return new ActiveMQTextMessage();
350        }
351    
352        /**
353         * Creates an initialized <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a
354         * message containing a <CODE>String</CODE>.
355         * 
356         * @param text the string used to initialize this message
357         * @return an ActiveMQTextMessage
358         * @throws JMSException if the JMS provider fails to create this message due to some internal error.
359         */
360        public TextMessage createTextMessage(String text) throws JMSException {
361            checkClosed();
362            ActiveMQTextMessage msg = new ActiveMQTextMessage();
363            msg.setText(text);
364            return msg;
365        }
366    
367        /**
368         * Indicates whether the session is in transacted mode.
369         * 
370         * @return true if the session is in transacted mode
371         * @throws JMSException if there is some internal error.
372         */
373        public boolean getTransacted() throws JMSException {
374            checkClosed();
375            return this.acknowledgeMode == Session.SESSION_TRANSACTED || transactionContext.isInXATransaction();
376        }
377    
378        /**
379         * Returns the acknowledgement mode of the session. The acknowledgement mode is set at the time that the session is
380         * created. If the session is transacted, the acknowledgement mode is ignored.
381         * 
382         * @return If the session is not transacted, returns the current acknowledgement mode for the session. If the
383         * session is transacted, returns SESSION_TRANSACTED.
384         * @throws JMSException
385         * @see javax.jms.Connection#createSession(boolean,int)
386         * @since 1.1 exception JMSException if there is some internal error.
387         */
388        public int getAcknowledgeMode() throws JMSException {
389            checkClosed();
390            return this.acknowledgeMode;
391        }
392    
393        /**
394         * Commits all messages done in this transaction and releases any locks currently held.
395         * 
396         * @throws JMSException if the JMS provider fails to commit the transaction due to some internal error.
397         * @throws TransactionRolledBackException if the transaction is rolled back due to some internal error during
398         * commit.
399         * @throws javax.jms.IllegalStateException if the method is not called by a transacted session.
400         */
401        public void commit() throws JMSException {
402            checkClosed();
403            if (!getTransacted()) {
404                throw new javax.jms.IllegalStateException("Not a transacted session");
405            }
406            transactionContext.commit();
407        }
408    
409        /**
410         * Rolls back any messages done in this transaction and releases any locks currently held.
411         * 
412         * @throws JMSException if the JMS provider fails to roll back the transaction due to some internal error.
413         * @throws javax.jms.IllegalStateException if the method is not called by a transacted session.
414         */
415        public void rollback() throws JMSException {
416            checkClosed();
417            if (!getTransacted()) {
418                throw new javax.jms.IllegalStateException("Not a transacted session");
419            }
420            transactionContext.rollback();
421        }
422    
423        public void clearDeliveredMessages() {
424            deliveredMessages.clear();        
425        }
426        
427        /**
428         * Closes the session.
429         * <P>
430         * Since a provider may allocate some resources on behalf of a session outside the JVM, clients should close the
431         * resources when they are not needed. Relying on garbage collection to eventually reclaim these resources may not
432         * be timely enough.
433         * <P>
434         * There is no need to close the producers and consumers of a closed session.
435         * <P>
436         * This call will block until a <CODE>receive</CODE> call or message listener in progress has completed. A blocked
437         * message consumer <CODE>receive</CODE> call returns <CODE>null</CODE> when this session is closed.
438         * <P>
439         * Closing a transacted session must roll back the transaction in progress.
440         * <P>
441         * This method is the only <CODE>Session</CODE> method that can be called concurrently.
442         * <P>
443         * Invoking any other <CODE>Session</CODE> method on a closed session must throw a <CODE>
444         * JMSException.IllegalStateException</CODE>. Closing a closed session must <I>not </I> throw an exception.
445         * 
446         * @throws JMSException if the JMS provider fails to close the session due to some internal error.
447         */
448        public void close() throws JMSException {
449            if (!this.closed) {
450                if (getTransactionContext().isInLocalTransaction()) {
451                    rollback();
452                }
453                doClose();
454                closed = true;
455            }
456        }
457    
458        protected void doClose() throws JMSException {
459            doAcknowledge(true);
460            deliveredMessages.clear();
461            for (Iterator i = consumers.iterator();i.hasNext();) {
462                ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
463                consumer.close();
464            }
465            for (Iterator i = producers.iterator();i.hasNext();) {
466                ActiveMQMessageProducer producer = (ActiveMQMessageProducer) i.next();
467                producer.close();
468            }
469            consumers.clear();
470            producers.clear();
471            this.connection.removeSession(this);
472            messageExecutor.close();
473        }
474    
475        /**
476         * @throws IllegalStateException if the Session is closed
477         */
478        protected void checkClosed() throws IllegalStateException {
479            if (this.closed) {
480                throw new IllegalStateException("The Session is closed");
481            }
482        }
483    
484        /**
485         * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
486         * <P>
487         * All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges all
488         * messages that have been delivered to the client.
489         * <P>
490         * Restarting a session causes it to take the following actions:
491         * <UL>
492         * <LI>Stop message delivery
493         * <LI>Mark all messages that might have been delivered but not acknowledged as "redelivered"
494         * <LI>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
495         * Redelivered messages do not have to be delivered in exactly their original delivery order.
496         * </UL>
497         * 
498         * @throws JMSException if the JMS provider fails to stop and restart message delivery due to some internal error.
499         * @throws IllegalStateException if the method is called by a transacted session.
500         */
501        public void recover() throws JMSException {
502            checkClosed();
503            if (getTransacted()) {
504                throw new IllegalStateException("This session is transacted");
505            }
506            redeliverUnacknowledgedMessages();
507        }
508    
509        /**
510         * Returns the session's distinguished message listener (optional).
511         * 
512         * @return the message listener associated with this session
513         * @throws JMSException if the JMS provider fails to get the message listener due to an internal error.
514         * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
515         * @see javax.jms.ServerSessionPool
516         * @see javax.jms.ServerSession
517         */
518        public MessageListener getMessageListener() throws JMSException {
519            checkClosed();
520            return this.messageListener;
521        }
522    
523        /**
524         * Sets the session's distinguished message listener (optional).
525         * <P>
526         * When the distinguished message listener is set, no other form of message receipt in the session can be used;
527         * however, all forms of sending messages are still supported.
528         * <P>
529         * This is an expert facility not used by regular JMS clients.
530         * 
531         * @param listener the message listener to associate with this session
532         * @throws JMSException if the JMS provider fails to set the message listener due to an internal error.
533         * @see javax.jms.Session#getMessageListener()
534         * @see javax.jms.ServerSessionPool
535         * @see javax.jms.ServerSession
536         */
537        public void setMessageListener(MessageListener listener) throws JMSException {
538            checkClosed();
539            this.messageListener = listener;
540            if (listener != null) {
541                messageExecutor.setDispatchedBySessionPool(true);
542            }
543        }
544    
545        /**
546         * Optional operation, intended to be used only by Application Servers, not by ordinary JMS clients.
547         * 
548         * @see javax.jms.ServerSession
549         */
550        public void run() {
551            ActiveMQMessage message;
552            while ((message = messageExecutor.dequeueNoWait()) != null) {
553                if( deliveryListener!=null )
554                    deliveryListener.beforeDelivery(this, message);
555                beforeMessageDelivered(message);
556                deliver(message);
557                if( deliveryListener!=null )
558                    deliveryListener.afterDelivery(this, message);
559            }
560        }
561    
562        /**
563         * Delivers a message to the messageListern
564         * @param message The message to deliver
565         */ 
566        private void deliver(ActiveMQMessage message) {
567            message = assembleMessage(message);
568            if (message != null && !message.isExpired() && this.messageListener != null) {
569                try {
570                    
571                    if( log.isDebugEnabled() ) {
572                        log.debug("Message delivered to session message listener: "+message);
573                    }
574                    
575                    this.messageListener.onMessage(message);
576                    this.afterMessageDelivered(true, message, true, false, true);
577                }
578                catch (Throwable t) {
579                    log.info("Caught :" + t, t);
580                    this.afterMessageDelivered(true, message, false, false, true);
581                }
582            }
583            else {
584                this.afterMessageDelivered(true, message, false, message.isExpired(), true);
585            }
586        }
587    
588        /**
589         * Creates a <CODE>MessageProducer</CODE> to send messages to the specified destination.
590         * <P>
591         * A client uses a <CODE>MessageProducer</CODE> object to send messages to a destination. Since <CODE>Queue
592         * </CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the
593         * destination parameter to create a <CODE>MessageProducer</CODE> object.
594         * 
595         * @param destination the <CODE>Destination</CODE> to send to, or null if this is a producer which does not have a
596         * specified destination.
597         * @return the MessageProducer
598         * @throws JMSException if the session fails to create a MessageProducer due to some internal error.
599         * @throws InvalidDestinationException if an invalid destination is specified.
600         * @since 1.1
601         */
602        public MessageProducer createProducer(Destination destination) throws JMSException {
603            checkClosed();
604            return new ActiveMQMessageProducer(this, ActiveMQMessageTransformation.transformDestination(destination));
605        }
606    
607        /**
608         * Creates a <CODE>MessageConsumer</CODE> for the specified destination. Since <CODE>Queue</CODE> and <CODE>
609         * Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the destination parameter to
610         * create a <CODE>MessageConsumer</CODE>.
611         * 
612         * @param destination the <CODE>Destination</CODE> to access.
613         * @return the MessageConsumer
614         * @throws JMSException if the session fails to create a consumer due to some internal error.
615         * @throws InvalidDestinationException if an invalid destination is specified.
616         * @since 1.1
617         */
618        public MessageConsumer createConsumer(Destination destination) throws JMSException {
619            checkClosed();
620            int prefetch = destination instanceof Topic ? connection.getPrefetchPolicy().getTopicPrefetch() : connection
621                    .getPrefetchPolicy().getQueuePrefetch();
622            return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
623                    "", this.connection.getNextConsumerNumber(), prefetch, false, false);
624        }
625    
626        /**
627         * Creates a <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. Since <CODE>
628         * Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the
629         * destination parameter to create a <CODE>MessageConsumer</CODE>.
630         * <P>
631         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been sent to a destination.
632         * 
633         * @param destination the <CODE>Destination</CODE> to access
634         * @param messageSelector only messages with properties matching the message selector expression are delivered. A
635         * value of null or an empty string indicates that there is no message selector for the message consumer.
636         * @return the MessageConsumer
637         * @throws JMSException if the session fails to create a MessageConsumer due to some internal error.
638         * @throws InvalidDestinationException if an invalid destination is specified.
639         * @throws InvalidSelectorException if the message selector is invalid.
640         * @since 1.1
641         */
642        public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
643            checkClosed();
644            int prefetch = destination instanceof Topic ? connection.getPrefetchPolicy().getTopicPrefetch() : connection
645                    .getPrefetchPolicy().getQueuePrefetch();
646            return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
647                    messageSelector, this.connection.getNextConsumerNumber(), prefetch, false, false);
648        }
649    
650        /**
651         * Creates <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. This method can
652         * specify whether messages published by its own connection should be delivered to it, if the destination is a
653         * topic.
654         * <P>
655         * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be
656         * used in the destination parameter to create a <CODE>MessageConsumer</CODE>.
657         * <P>
658         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been published to a
659         * destination.
660         * <P>
661         * In some cases, a connection may both publish and subscribe to a topic. The consumer <CODE>NoLocal</CODE>
662         * attribute allows a consumer to inhibit the delivery of messages published by its own connection. The default
663         * value for this attribute is False. The <CODE>noLocal</CODE> value must be supported by destinations that are
664         * topics.
665         * 
666         * @param destination the <CODE>Destination</CODE> to access
667         * @param messageSelector only messages with properties matching the message selector expression are delivered. A
668         * value of null or an empty string indicates that there is no message selector for the message consumer.
669         * @param NoLocal - if true, and the destination is a topic, inhibits the delivery of messages published by its own
670         * connection. The behavior for <CODE>NoLocal</CODE> is not specified if the destination is a queue.
671         * @return the MessageConsumer
672         * @throws JMSException if the session fails to create a MessageConsumer due to some internal error.
673         * @throws InvalidDestinationException if an invalid destination is specified.
674         * @throws InvalidSelectorException if the message selector is invalid.
675         * @since 1.1
676         */
677        public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal)
678                throws JMSException {
679            checkClosed();
680            int prefetch = connection.getPrefetchPolicy().getTopicPrefetch();
681            return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
682                    messageSelector, this.connection.getNextConsumerNumber(), prefetch, NoLocal, false);
683        }
684    
685        /**
686         * Creates a queue identity given a <CODE>Queue</CODE> name.
687         * <P>
688         * This facility is provided for the rare cases where clients need to dynamically manipulate queue identity. It
689         * allows the creation of a queue identity with a provider-specific name. Clients that depend on this ability are
690         * not portable.
691         * <P>
692         * Note that this method is not for creating the physical queue. The physical creation of queues is an
693         * administrative task and is not to be initiated by the JMS API. The one exception is the creation of temporary
694         * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> method.
695         * 
696         * @param queueName the name of this <CODE>Queue</CODE>
697         * @return a <CODE>Queue</CODE> with the given name
698         * @throws JMSException if the session fails to create a queue due to some internal error.
699         * @since 1.1
700         */
701        public Queue createQueue(String queueName) throws JMSException {
702            checkClosed();
703            return new ActiveMQQueue(queueName);
704        }
705    
706        /**
707         * Creates a topic identity given a <CODE>Topic</CODE> name.
708         * <P>
709         * This facility is provided for the rare cases where clients need to dynamically manipulate topic identity. This
710         * allows the creation of a topic identity with a provider-specific name. Clients that depend on this ability are
711         * not portable.
712         * <P>
713         * Note that this method is not for creating the physical topic. The physical creation of topics is an
714         * administrative task and is not to be initiated by the JMS API. The one exception is the creation of temporary
715         * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> method.
716         * 
717         * @param topicName the name of this <CODE>Topic</CODE>
718         * @return a <CODE>Topic</CODE> with the given name
719         * @throws JMSException if the session fails to create a topic due to some internal error.
720         * @since 1.1
721         */
722        public Topic createTopic(String topicName) throws JMSException {
723            checkClosed();
724            return new ActiveMQTopic(topicName);
725        }
726    
727        /**
728         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue.
729         * 
730         * @param queue the <CODE>queue</CODE> to access
731         * @exception InvalidDestinationException if an invalid destination is specified
732         * @since 1.1
733         */
734        /**
735         * Creates a durable subscriber to the specified topic.
736         * <P>
737         * If a client needs to receive all the messages published on a topic, including the ones published while the
738         * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of
739         * this durable subscription and insures that all messages from the topic's publishers are retained until they are
740         * acknowledged by this durable subscriber or they have expired.
741         * <P>
742         * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must
743         * specify a name that uniquely identifies (within client identifier) each durable subscription it creates. Only one
744         * session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription.
745         * <P>
746         * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with
747         * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
748         * unsubscribing (deleting) the old one and creating a new one.
749         * <P>
750         * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
751         * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
752         * value for this attribute is false.
753         * 
754         * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
755         * @param name the name used to identify this subscription
756         * @return the TopicSubscriber
757         * @throws JMSException if the session fails to create a subscriber due to some internal error.
758         * @throws InvalidDestinationException if an invalid topic is specified.
759         * @since 1.1
760         */
761        public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
762            checkClosed();
763            return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name, "",
764                    this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getDurableTopicPrefetch(),
765                    false, false);
766        }
767    
768        /**
769         * Creates a durable subscriber to the specified topic, using a message selector and specifying whether messages
770         * published by its own connection should be delivered to it.
771         * <P>
772         * If a client needs to receive all the messages published on a topic, including the ones published while the
773         * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of
774         * this durable subscription and insures that all messages from the topic's publishers are retained until they are
775         * acknowledged by this durable subscriber or they have expired.
776         * <P>
777         * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must
778         * specify a name which uniquely identifies (within client identifier) each durable subscription it creates. Only
779         * one session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
780         * inactive durable subscriber is one that exists but does not currently have a message consumer associated with it.
781         * <P>
782         * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with
783         * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
784         * unsubscribing (deleting) the old one and creating a new one.
785         * 
786         * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
787         * @param name the name used to identify this subscription
788         * @param messageSelector only messages with properties matching the message selector expression are delivered. A
789         * value of null or an empty string indicates that there is no message selector for the message consumer.
790         * @param noLocal if set, inhibits the delivery of messages published by its own connection
791         * @return the Queue Browser
792         * @throws JMSException if the session fails to create a subscriber due to some internal error.
793         * @throws InvalidDestinationException if an invalid topic is specified.
794         * @throws InvalidSelectorException if the message selector is invalid.
795         * @since 1.1
796         */
797        public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
798                throws JMSException {
799            checkClosed();
800            return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name,
801                    messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
802                            .getDurableTopicPrefetch(), noLocal, false);
803        }
804    
805        /**
806         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue.
807         * 
808         * @param queue the <CODE>queue</CODE> to access
809         * @return the Queue Browser
810         * @throws JMSException if the session fails to create a browser due to some internal error.
811         * @throws InvalidDestinationException if an invalid destination is specified
812         * @since 1.1
813         */
814        public QueueBrowser createBrowser(Queue queue) throws JMSException {
815            checkClosed();
816            return new ActiveMQQueueBrowser(this, (ActiveMQQueue) ActiveMQMessageTransformation.transformDestination(queue), "",
817                    this.connection.getNextConsumerNumber());
818        }
819    
820        /**
821         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue using a message
822         * selector.
823         * 
824         * @param queue the <CODE>queue</CODE> to access
825         * @param messageSelector only messages with properties matching the message selector expression are delivered. A
826         * value of null or an empty string indicates that there is no message selector for the message consumer.
827         * @return the Queue Browser
828         * @throws JMSException if the session fails to create a browser due to some internal error.
829         * @throws InvalidDestinationException if an invalid destination is specified
830         * @throws InvalidSelectorException if the message selector is invalid.
831         * @since 1.1
832         */
833        public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
834            checkClosed();
835            return new ActiveMQQueueBrowser(this, (ActiveMQQueue) ActiveMQMessageTransformation.transformDestination(queue),
836                    messageSelector, this.connection.getNextConsumerNumber());
837        }
838    
839        /**
840         * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE> unless
841         * it is deleted earlier.
842         * 
843         * @return a temporary queue identity
844         * @throws JMSException if the session fails to create a temporary queue due to some internal error.
845         * @since 1.1
846         */
847        public TemporaryQueue createTemporaryQueue() throws JMSException {
848            checkClosed();
849            String tempQueueName = "TemporaryQueue-"
850                    + ActiveMQDestination.createTemporaryName(this.connection.getInitializedClientID());
851            tempQueueName += this.temporaryDestinationGenerator.generateId();
852            ActiveMQTemporaryQueue tempQueue =  new ActiveMQTemporaryQueue(tempQueueName);
853           tempQueue.setSessionCreatedBy(this);
854           this.connection.startTemporaryDestination(tempQueue);
855           return tempQueue;
856        }
857    
858        /**
859         * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE> unless
860         * it is deleted earlier.
861         * 
862         * @return a temporary topic identity
863         * @throws JMSException if the session fails to create a temporary topic due to some internal error.
864         * @since 1.1
865         */
866        public TemporaryTopic createTemporaryTopic() throws JMSException {
867            checkClosed();
868            String tempTopicName = "TemporaryTopic-"
869                    + ActiveMQDestination.createTemporaryName(this.connection.getInitializedClientID());
870            tempTopicName += this.temporaryDestinationGenerator.generateId();
871            ActiveMQTemporaryTopic tempTopic =  new ActiveMQTemporaryTopic(tempTopicName);
872            tempTopic.setSessionCreatedBy(this);
873            this.connection.startTemporaryDestination(tempTopic);
874            return tempTopic;
875        }
876    
877        /**
878         * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue.
879         * 
880         * @param queue the <CODE>Queue</CODE> to access
881         * @return @throws JMSException if the session fails to create a receiver due to some internal error.
882         * @throws JMSException
883         * @throws InvalidDestinationException if an invalid queue is specified.
884         */
885        public QueueReceiver createReceiver(Queue queue) throws JMSException {
886            checkClosed();
887            return new ActiveMQQueueReceiver(this, ActiveMQDestination.transformDestination(queue), "", this.connection
888                    .getNextConsumerNumber(), this.connection.getPrefetchPolicy().getQueuePrefetch());
889        }
890    
891        /**
892         * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue using a message
893         * selector.
894         * 
895         * @param queue the <CODE>Queue</CODE> to access
896         * @param messageSelector only messages with properties matching the message selector expression are delivered. A
897         * value of null or an empty string indicates that there is no message selector for the message consumer.
898         * @return QueueReceiver
899         * @throws JMSException if the session fails to create a receiver due to some internal error.
900         * @throws InvalidDestinationException if an invalid queue is specified.
901         * @throws InvalidSelectorException if the message selector is invalid.
902         */
903        public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
904            checkClosed();
905            return new ActiveMQQueueReceiver(this, ActiveMQMessageTransformation.transformDestination(queue),
906                    messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
907                            .getQueuePrefetch());
908        }
909    
910        /**
911         * Creates a <CODE>QueueSender</CODE> object to send messages to the specified queue.
912         * 
913         * @param queue the <CODE>Queue</CODE> to access, or null if this is an unidentified producer
914         * @return QueueSender
915         * @throws JMSException if the session fails to create a sender due to some internal error.
916         * @throws InvalidDestinationException if an invalid queue is specified.
917         */
918        public QueueSender createSender(Queue queue) throws JMSException {
919            checkClosed();
920            return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue));
921        }
922    
923        /**
924         * Creates a nondurable subscriber to the specified topic. <p/>
925         * <P>
926         * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic.
927         * <p/>
928         * <P>
929         * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published
930         * while they are active. <p/>
931         * <P>
932         * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
933         * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
934         * value for this attribute is false.
935         * 
936         * @param topic the <CODE>Topic</CODE> to subscribe to
937         * @return TopicSubscriber
938         * @throws JMSException if the session fails to create a subscriber due to some internal error.
939         * @throws InvalidDestinationException if an invalid topic is specified.
940         */
941        public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
942            checkClosed();
943            return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null, null,
944                    this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getTopicPrefetch(), false,
945                    false);
946        }
947    
948        /**
949         * Creates a nondurable subscriber to the specified topic, using a message selector or specifying whether messages
950         * published by its own connection should be delivered to it. <p/>
951         * <P>
952         * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic.
953         * <p/>
954         * <P>
955         * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published
956         * while they are active. <p/>
957         * <P>
958         * Messages filtered out by a subscriber's message selector will never be delivered to the subscriber. From the
959         * subscriber's perspective, they do not exist. <p/>
960         * <P>
961         * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
962         * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
963         * value for this attribute is false.
964         * 
965         * @param topic the <CODE>Topic</CODE> to subscribe to
966         * @param messageSelector only messages with properties matching the message selector expression are delivered. A
967         * value of null or an empty string indicates that there is no message selector for the message consumer.
968         * @param noLocal if set, inhibits the delivery of messages published by its own connection
969         * @return TopicSubscriber
970         * @throws JMSException if the session fails to create a subscriber due to some internal error.
971         * @throws InvalidDestinationException if an invalid topic is specified.
972         * @throws InvalidSelectorException if the message selector is invalid.
973         */
974        public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
975            checkClosed();
976            return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null,
977                    messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
978                            .getTopicPrefetch(), noLocal, false);
979        }
980    
981        /**
982         * Creates a publisher for the specified topic. <p/>
983         * <P>
984         * A client uses a <CODE>TopicPublisher</CODE> object to publish messages on a topic. Each time a client creates a
985         * <CODE>TopicPublisher</CODE> on a topic, it defines a new sequence of messages that have no ordering
986         * relationship with the messages it has previously sent.
987         * 
988         * @param topic the <CODE>Topic</CODE> to publish to, or null if this is an unidentified producer
989         * @return TopicPublisher
990         * @throws JMSException if the session fails to create a publisher due to some internal error.
991         * @throws InvalidDestinationException if an invalid topic is specified.
992         */
993        public TopicPublisher createPublisher(Topic topic) throws JMSException {
994            checkClosed();
995            return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic));
996        }
997    
998        /**
999         * Unsubscribes a durable subscription that has been created by a client.
1000         * <P>
1001         * This method deletes the state being maintained on behalf of the subscriber by its provider.
1002         * <P>
1003         * It is erroneous for a client to delete a durable subscription while there is an active <CODE>MessageConsumer
1004         * </CODE> or <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed message is part of a pending
1005         * transaction or has not been acknowledged in the session.
1006         * 
1007         * @param name the name used to identify this subscription
1008         * @throws JMSException if the session fails to unsubscribe to the durable subscription due to some internal error.
1009         * @throws InvalidDestinationException if an invalid subscription name is specified.
1010         * @since 1.1
1011         */
1012        public void unsubscribe(String name) throws JMSException {
1013            checkClosed();
1014            DurableUnsubscribe ds = new DurableUnsubscribe();
1015            ds.setClientId(this.connection.getClientID());
1016            ds.setSubscriberName(name);
1017            this.connection.syncSendPacket(ds);
1018        }
1019    
1020        /**
1021         * Tests to see if the Message Dispatcher is a target for this message
1022         * 
1023         * @param message the message to test
1024         * @return true if the Message Dispatcher can dispatch the message
1025         */
1026        public boolean isTarget(ActiveMQMessage message) {
1027            for (Iterator i = this.consumers.iterator();i.hasNext();) {
1028                ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
1029                if (message.isConsumerTarget(consumer.getConsumerNumber())) {
1030                    return true;
1031                }
1032            }
1033            return false;
1034        }
1035    
1036        /**
1037         * Dispatch an ActiveMQMessage
1038         * 
1039         * @param message
1040         */
1041        public void dispatch(ActiveMQMessage message) {
1042            message = assembleMessage(message);
1043            if (message != null){
1044                message.setMessageAcknowledge(this);
1045                messageExecutor.execute(message);
1046            }
1047        }
1048    
1049        /**
1050         * Acknowledges all consumed messages of the session of this consumed message.
1051         * <P>
1052         * All consumed JMS messages support the <CODE>acknowledge</CODE> method for use when a client has specified that
1053         * its JMS session's consumed messages are to be explicitly acknowledged. By invoking <CODE>acknowledge</CODE> on
1054         * a consumed message, a client acknowledges all messages consumed by the session that the message was delivered to.
1055         * <P>
1056         * Calls to <CODE>acknowledge</CODE> are ignored for both transacted sessions and sessions specified to use
1057         * implicit acknowledgement modes.
1058         * <P>
1059         * A client may individually acknowledge each message as it is consumed, or it may choose to acknowledge messages as
1060         * an application-defined group (which is done by calling acknowledge on the last received message of the group,
1061         * thereby acknowledging all messages consumed by the session.)
1062         * <P>
1063         * Messages that have been received but not acknowledged may be redelivered.
1064         * @param caller - the message calling acknowledge on the session
1065         * 
1066         * @throws JMSException if the JMS provider fails to acknowledge the messages due to some internal error.
1067         * @throws javax.jms.IllegalStateException if this method is called on a closed session.
1068         * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1069         */
1070        public void acknowledge(ActiveMQMessage caller) throws JMSException {
1071            checkClosed();
1072            /**
1073             * Find the caller and ensure it is marked as consumed
1074             * This is to ensure acknowledge called by a 
1075             * MessageListener works correctly
1076             */
1077            ActiveMQMessage msg = (ActiveMQMessage)deliveredMessages.get(caller);
1078            if (msg != null){
1079                msg.setMessageConsumed(true);
1080            }
1081           
1082            doAcknowledge(false);
1083        }
1084    
1085        protected void doAcknowledge(boolean isClosing) throws JMSException {
1086            if (!closed) {
1087                if (this.acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) {
1088                    ActiveMQMessage msg = null;
1089                    while((msg = (ActiveMQMessage)deliveredMessages.removeFirst())!=null){
1090                        boolean messageConsumed = isClosing ? false : msg.isMessageConsumed();
1091                        if (!msg.isTransientConsumed()){
1092                            sendMessageAck(msg, messageConsumed, false);
1093                        }else {
1094                            if (!messageConsumed){
1095                                connection.addToTransientConsumedRedeliverCache(msg);
1096                            }
1097                        }
1098                    }
1099                    deliveredMessages.clear();
1100                }
1101            }
1102        }
1103    
1104        protected void beforeMessageDelivered(ActiveMQMessage message) {
1105            if (message != null && !closed) {
1106                deliveredMessages.add(message);
1107            }
1108        }
1109    
1110        protected void afterMessageDelivered(boolean sendAcknowledge, ActiveMQMessage message, boolean messageConsumed,
1111                boolean messageExpired, boolean beforeCalled) {
1112            if (message != null && !closed) {
1113                if ((isClientAcknowledge() && !messageExpired) || (isTransacted() && message.isTransientConsumed())) {
1114                    message.setMessageConsumed(messageConsumed);
1115                    if (!beforeCalled) {
1116                        deliveredMessages.add(message);
1117                    }
1118                }
1119                else {
1120                    if (beforeCalled) {
1121                        deliveredMessages.remove(message);
1122                    }
1123                }
1124                //don't send acks for expired messages unless sendAcknowledge is set
1125                //the sendAcknowledge flag is set for all messages expect those destined
1126                //for transient Topic subscribers
1127                if (sendAcknowledge && !isClientAcknowledge()) {
1128                    try {
1129                        doStartTransaction();
1130                        sendMessageAck(message,messageConsumed,messageExpired);
1131                    }
1132                    catch (JMSException e) {
1133                        log.warn("failed to notify Broker that message is delivered", e);
1134                    }
1135                }
1136            }
1137        }
1138        
1139        /**
1140         * remove a temporary destination
1141         * @param destination
1142         * @throws JMSException if active subscribers already exist
1143         */
1144        public void removeTemporaryDestination(ActiveMQDestination destination) throws JMSException{
1145            this.connection.stopTemporaryDestination(destination);
1146        }
1147        
1148        private void sendMessageAck(ActiveMQMessage message, boolean messageConsumed, boolean messageExpired)
1149                throws JMSException {
1150            if (message.isMessagePart()) {
1151                ActiveMQMessage[] parts = (ActiveMQMessage[]) assemblies.remove(message.getParentMessageID());
1152                if (parts != null) {
1153                    for (int i = 0;i < parts.length;i++) {
1154                        parts[i].setConsumerIdentifer(message.getConsumerIdentifer());
1155                        doSendMessageAck(parts[i], messageConsumed, messageExpired);
1156                    }
1157                }
1158                else {
1159                    JMSException jmsEx = new JMSException("Could not find parts for fragemented message: " + message);
1160                    connection.onException(jmsEx);
1161                }
1162            }
1163            else {
1164                doSendMessageAck(message, messageConsumed, messageExpired);
1165            }
1166        }
1167        
1168        private void doSendMessageAck(ActiveMQMessage message, boolean messageConsumed, boolean messageExpired)
1169                throws JMSException {
1170            if (message != null && !message.isAdvisory()) {
1171                MessageAck ack = new MessageAck();
1172                ack.setConsumerId(message.getConsumerIdentifer());
1173                ack.setTransactionId(transactionContext.getTransactionId());
1174                ack.setExternalMessageId(message.isExternalMessageId());
1175                ack.setMessageID(message.getJMSMessageID());
1176                ack.setSequenceNumber(message.getSequenceNumber());
1177                ack.setProducerKey(message.getProducerKey());
1178                ack.setMessageRead(messageConsumed);
1179                ack.setDestination(message.getJMSActiveMQDestination());
1180                ack.setPersistent(message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT);
1181                ack.setExpired(messageExpired);
1182                ack.setSessionId(getSessionId());
1183                this.connection.asyncSendPacket(ack);
1184            }
1185        }
1186    
1187        /**
1188         * @param consumer
1189         * @throws JMSException
1190         */
1191        protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1192            // ensure that the connection info is sent to the broker
1193            connection.sendConnectionInfoToBroker();
1194            // lets add the stat
1195            if (consumer.isDurableSubscriber()) {
1196                stats.onCreateDurableSubscriber();
1197            }
1198            ConsumerInfo info = createConsumerInfo(consumer);
1199            info.setStarted(true);
1200            //we add before notifying the server - as messages could
1201            //start to be dispatched before receipt from syncSend()
1202            //is returned
1203            this.consumers.add(consumer);
1204            if (started.get()){
1205                connection.replayTransientConsumedRedeliveredMessages(this,consumer);
1206            }
1207            try {
1208                this.connection.syncSendPacket(info);
1209            }
1210            catch (JMSException jmsEx) {
1211                this.consumers.remove(consumer);
1212                throw jmsEx;
1213            }
1214        }
1215    
1216        /**
1217         * @param consumer
1218         * @throws JMSException
1219         */
1220        protected void removeConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1221            this.consumers.remove(consumer);
1222            // lets remove the stat
1223            if (consumer.isDurableSubscriber()) {
1224                stats.onRemoveDurableSubscriber();
1225            }
1226            if (!closed) {
1227                ConsumerInfo info = createConsumerInfo(consumer);
1228                info.setStarted(false);
1229                this.connection.asyncSendPacket(info, false);
1230            }
1231        }
1232    
1233        protected ConsumerInfo createConsumerInfo(ActiveMQMessageConsumer consumer) throws JMSException {
1234            ConsumerInfo info = new ConsumerInfo();
1235            info.setConsumerId(consumer.consumerIdentifier);
1236            info.setClientId(connection.clientID);
1237            info.setSessionId(this.sessionId);
1238            info.setConsumerNo(consumer.consumerNumber);
1239            info.setPrefetchNumber(consumer.prefetchNumber);
1240            info.setDestination(consumer.destination);
1241            info.setNoLocal(consumer.noLocal);
1242            info.setBrowser(consumer.browser);
1243            info.setSelector(consumer.messageSelector);
1244            info.setStartTime(consumer.startTime);
1245            info.setConsumerName(consumer.consumerName);
1246            return info;
1247        }
1248    
1249        /**
1250         * @param producer
1251         * @throws JMSException
1252         */
1253        protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1254            // ensure that the connection info is sent to the broker
1255            connection.sendConnectionInfoToBroker();
1256            //start listening for advisories if the destination is temporary
1257            this.connection.startAdvisoryForTempDestination(producer.defaultDestination);
1258            producer.setProducerId(connection.handleIdGenerator.getNextShortSequence());
1259            ProducerInfo info = createProducerInfo(producer);
1260            info.setStarted(true);
1261            this.connection.asyncSendPacket(info);
1262            this.producers.add(producer);
1263        }
1264    
1265        /**
1266         * @param producer
1267         * @throws JMSException
1268         */
1269        protected void removeProducer(ActiveMQMessageProducer producer) throws JMSException {
1270            this.producers.remove(producer);
1271            if (!closed) {
1272                this.connection.stopAdvisoryForTempDestination(producer.defaultDestination);
1273                ProducerInfo info = createProducerInfo(producer);
1274                info.setStarted(false);
1275                this.connection.asyncSendPacket(info, false);
1276            }
1277        }
1278    
1279        protected ProducerInfo createProducerInfo(ActiveMQMessageProducer producer) throws JMSException {
1280            ProducerInfo info = new ProducerInfo();
1281            info.setProducerId(producer.getProducerId());
1282            info.setClientId(connection.clientID);
1283            info.setSessionId(this.sessionId);
1284            info.setDestination(producer.defaultDestination);
1285            info.setStartTime(producer.getStartTime());
1286            return info;
1287        }
1288    
1289        /**
1290         * Start this Session
1291         * @throws JMSException
1292         */
1293        protected void start() throws JMSException {
1294            started.set(true);
1295            for (Iterator i = consumers.iterator(); i.hasNext();){
1296                ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i.next();
1297                connection.replayTransientConsumedRedeliveredMessages(this,consumer);
1298            }
1299            messageExecutor.start();
1300        }
1301    
1302        /**
1303         * Stop this Session
1304         */
1305        protected void stop() {
1306            started.set(false);
1307            messageExecutor.stop();
1308        }
1309    
1310        /**
1311         * @return Returns the sessionId.
1312         */
1313        protected short getSessionId() {
1314            return sessionId;
1315        }
1316    
1317        /**
1318         * @param sessionId The sessionId to set.
1319         */
1320        protected void setSessionId(short sessionId) {
1321            this.sessionId = sessionId;
1322        }
1323    
1324        /**
1325         * @return Returns the startTime.
1326         */
1327        protected long getStartTime() {
1328            return startTime;
1329        }
1330    
1331        /**
1332         * @param startTime The startTime to set.
1333         */
1334        protected void setStartTime(long startTime) {
1335            this.startTime = startTime;
1336        }
1337    
1338        /**
1339         * send the message for dispatch by the broker
1340         * 
1341         * @param producer
1342         * @param destination
1343         * @param message
1344         * @param deliveryMode
1345         * @param priority
1346         * @param timeToLive
1347         * @throws JMSException
1348         */
1349        protected void send(ActiveMQMessageProducer producer, Destination destination, Message message, int deliveryMode,
1350                int priority, long timeToLive, boolean reuseMessageId) throws JMSException {
1351            checkClosed();
1352            // ensure that the connection info is sent to the broker
1353            connection.sendConnectionInfoToBroker();
1354            // tell the Broker we are about to start a new transaction
1355            doStartTransaction();
1356            message.setJMSDestination(destination);
1357            message.setJMSDeliveryMode(deliveryMode);
1358            message.setJMSPriority(priority);
1359            long expiration = 0L;
1360            if (!producer.getDisableMessageTimestamp()) {
1361                long timeStamp = System.currentTimeMillis();
1362                message.setJMSTimestamp(timeStamp);
1363                if (timeToLive > 0) {
1364                    expiration = timeToLive + timeStamp;
1365                }
1366            }
1367            message.setJMSExpiration(expiration);
1368            String id = message.getJMSMessageID();
1369            String producerKey = producer.getProducerMessageKey();
1370            long sequenceNumber = producer.getIdGenerator().getNextSequence();
1371            
1372            if ((id == null || id.length() == 0) || !producer.getDisableMessageID() && !reuseMessageId) {
1373                message.setJMSMessageID(producerKey + sequenceNumber);
1374            }
1375            //transform to our own message format here
1376            ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message);
1377            if (connection.isCopyMessageOnSend()){
1378                msg = msg.shallowCopy();
1379            }
1380            //clear identity - incase forwared on
1381            msg.setJMSMessageIdentity(null);
1382            msg.setExternalMessageId(id != null);
1383            msg.setSequenceNumber(sequenceNumber);
1384            msg.setProducerKey(producerKey);
1385            msg.setTransactionId(transactionContext.getTransactionId());
1386            msg.setJMSClientID(this.connection.clientID);
1387            msg.setMesssageHandle(producer.getProducerId());
1388            //reset state as could be forwarded on
1389            msg.setJMSRedelivered(false);
1390            if (!connection.isInternalConnection()){
1391                msg.clearBrokersVisited();
1392                connection.validateDestination(msg.getJMSActiveMQDestination());
1393            }
1394            
1395            if (this.connection.isPrepareMessageBodyOnSend()){
1396                msg.prepareMessageBody();
1397            }
1398            //do message payload compression
1399            if (connection.isDoMessageCompression()){
1400                try {
1401                    msg.getBodyAsBytes(compression);
1402                }
1403                catch (IOException e) {
1404                    JMSException jmsEx = new JMSException("Failed to compress message payload");
1405                    jmsEx.setLinkedException(e);
1406                    throw jmsEx;
1407                }
1408            }
1409            boolean fragmentedMessage = connection.isDoMessageFragmentation();
1410            if (fragmentedMessage && !msg.isMessagePart()){
1411                try {
1412                    fragmentedMessage = fragmentation.doFragmentation(msg.getBodyAsBytes());
1413                    if (fragmentedMessage){
1414                        ByteArray[] array = fragmentation.fragment(msg.getBodyAsBytes());
1415                        String parentMessageId = msg.getJMSMessageID();
1416                        for (int i = 0; i < array.length; i++){
1417                            ActiveMQMessage fragment = msg.shallowCopy();
1418                            fragment.setJMSMessageID(null);
1419                            fragment.setMessagePart(true);
1420                            fragment.setParentMessageID(parentMessageId);
1421                            fragment.setNumberOfParts((short)array.length);
1422                            fragment.setPartNumber((short)i);
1423                            if (i != 0){
1424                                fragment.setSequenceNumber(producer.getIdGenerator().getNextSequence());
1425                            }
1426                            fragment.setBodyAsBytes(array[i]);
1427                            if (this.connection.isUseAsyncSend()) {
1428                                this.connection.asyncSendPacket(fragment);
1429                            }
1430                            else {
1431                                this.connection.syncSendPacket(fragment);
1432                            }
1433                            
1434                        }
1435                    }
1436                }catch (IOException e) {
1437                    JMSException jmsEx = new JMSException("Failed to fragment message payload");
1438                    jmsEx.setLinkedException(e);
1439                    throw jmsEx;
1440                }
1441            }
1442            if (log.isDebugEnabled()) {
1443                log.debug("Sending message: " + msg);
1444            }
1445            
1446            if (!fragmentedMessage){
1447                if (this.connection.isUseAsyncSend() || acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE) {
1448                    this.connection.asyncSendPacket(msg);
1449                }
1450                else {
1451                    this.connection.syncSendPacket(msg);
1452                }
1453            }
1454        }
1455    
1456        /**
1457         * Send TransactionInfo to indicate transaction has started
1458         * 
1459         * @throws JMSException if some internal error occurs
1460         */
1461        protected void doStartTransaction() throws JMSException {
1462            if (getTransacted() && !transactionContext.isInXATransaction()) {
1463                transactionContext.begin();
1464            }
1465        }
1466    
1467        protected void setSessionConsumerDispatchState(int value) throws JMSException {
1468            if (consumerDispatchState != ActiveMQSession.CONSUMER_DISPATCH_UNSET && value != consumerDispatchState) {
1469                String errorStr = "Cannot mix consumer dispatching on a session - already: ";
1470                if (value == ActiveMQSession.CONSUMER_DISPATCH_SYNC) {
1471                    errorStr += "synchronous";
1472                }
1473                else {
1474                    errorStr += "asynchronous";
1475                }
1476                throw new IllegalStateException(errorStr);
1477            }
1478            consumerDispatchState = value;
1479        }
1480    
1481        protected void redeliverUnacknowledgedMessages() {
1482            redeliverUnacknowledgedMessages(false);
1483        }
1484    
1485        protected void redeliverUnacknowledgedMessages(boolean onlyDeliverTransientConsumed) {
1486            messageExecutor.stop();
1487            LinkedList replay = new LinkedList();
1488            Object obj = null;
1489            while ((obj = deliveredMessages.removeFirst()) != null) {
1490                replay.add(obj);
1491            }
1492            
1493            deliveredMessages.clear();
1494            if (!replay.isEmpty()) {
1495                for (ListIterator i = replay.listIterator(replay.size());i.hasPrevious();) {
1496                    ActiveMQMessage msg = (ActiveMQMessage) i.previous();
1497                    if (!onlyDeliverTransientConsumed || msg.isTransientConsumed()) {
1498                        msg.setJMSRedelivered(true);
1499                        msg.incrementDeliveryCount();
1500                        messageExecutor.executeFirst(msg);
1501                    }
1502                }
1503            }
1504            replay.clear();
1505            messageExecutor.start();
1506        }
1507    
1508        protected void clearMessagesInProgress() {
1509            messageExecutor.clearMessagesInProgress();
1510            for (Iterator i = consumers.iterator();i.hasNext();) {
1511                ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
1512                consumer.clearMessagesInProgress();
1513            }
1514        }
1515        
1516        public boolean hasUncomsumedMessages() {
1517            return messageExecutor.hasUncomsumedMessages();
1518        }
1519        
1520        public boolean isTransacted() {
1521            return this.acknowledgeMode == Session.SESSION_TRANSACTED;
1522        }
1523    
1524        protected boolean isClientAcknowledge() {
1525            return this.acknowledgeMode == Session.CLIENT_ACKNOWLEDGE;
1526        }
1527        
1528            /**
1529         * @return Returns the internalSession.
1530         */
1531        public boolean isInternalSession() {
1532            return internalSession;
1533        }
1534        /**
1535         * @param internalSession The internalSession to set.
1536         */
1537        public void setInternalSession(boolean internalSession) {
1538            this.internalSession = internalSession;
1539        }
1540        
1541        
1542        private final ActiveMQMessage assembleMessage(ActiveMQMessage message) {
1543            ActiveMQMessage result = message;
1544            if (message != null && !connection.isInternalConnection() && message.isMessagePart()) {
1545                if (message.getNumberOfParts() == 1) {
1546                    //passed though from another session - i.e.
1547                    //a network or remote connection and now assembled
1548                    message.resetMessagePart();
1549                    result = message;
1550                }
1551                else {
1552                    result = null;
1553                    String parentId = message.getParentMessageID();
1554                    ActiveMQMessage[] array = (ActiveMQMessage[]) assemblies.get(parentId);
1555                    if (array == null) {
1556                        array = new ActiveMQMessage[message.getNumberOfParts()];
1557                        assemblies.put(parentId, array);
1558                    }
1559                    array[message.getPartNumber()] = message;
1560                    boolean complete = true;
1561                    for (int i = 0;i < array.length;i++) {
1562                        complete &= array[i] != null;
1563                    }
1564                    if (complete) {
1565                        result = array[0];
1566                        ByteArray[] bas = new ByteArray[array.length];
1567                        try {
1568                            for (int i = 0;i < bas.length;i++) {
1569                                bas[i] = array[i].getBodyAsBytes();
1570                                if (i >= 1){
1571                                    array[i].clearBody();
1572                                }
1573                            }
1574                            ByteArray ba = fragmentation.assemble(bas);
1575                            result.setBodyAsBytes(ba);
1576                        }
1577                        catch (IOException ioe) {
1578                            JMSException jmsEx = new JMSException("Failed to assemble fragment message: " + parentId);
1579                            jmsEx.setLinkedException(ioe);
1580                            this.connection.onException(jmsEx);
1581                        }catch(JMSException jmsEx){
1582                            this.connection.onException(jmsEx);  
1583                        }
1584                    }
1585                }
1586            }
1587            return result;
1588        }
1589    
1590        public DeliveryListener getDeliveryListener() {
1591            return deliveryListener;
1592        }
1593        
1594    
1595        public void setDeliveryListener(DeliveryListener deliveryListener) {
1596            this.deliveryListener = deliveryListener;
1597        }
1598        
1599    }