View Javadoc

1   /***
2    *
3    * Copyright 2004 Protique Ltd
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.codehaus.activemq;
19  
20  import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
21  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.codehaus.activemq.management.JMSSessionStatsImpl;
25  import org.codehaus.activemq.management.StatsCapable;
26  import org.codehaus.activemq.message.*;
27  import org.codehaus.activemq.ra.LocalTransactionEventListener;
28  import org.codehaus.activemq.util.IdGenerator;
29  
30  import javax.jms.*;
31  import javax.jms.IllegalStateException;
32  import javax.management.j2ee.statistics.Stats;
33  import java.io.Serializable;
34  import java.util.HashSet;
35  import java.util.Iterator;
36  import java.util.LinkedHashSet;
37  import java.util.LinkedList;
38  
39  /***
40   * <P>
41   * A <CODE>Session</CODE> object is a single-threaded context for producing and consuming messages. Although it may
42   * allocate provider resources outside the Java virtual machine (JVM), it is considered a lightweight JMS object.
43   * <P>
44   * A session serves several purposes:
45   * <UL>
46   * <LI>It is a factory for its message producers and consumers.
47   * <LI>It supplies provider-optimized message factories.
48   * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and <CODE>TemporaryQueues</CODE>.
49   * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> objects for those clients that need to
50   * dynamically manipulate provider-specific destination names.
51   * <LI>It supports a single series of transactions that combine work spanning its producers and consumers into atomic
52   * units.
53   * <LI>It defines a serial order for the messages it consumes and the messages it produces.
54   * <LI>It retains messages it consumes until they have been acknowledged.
55   * <LI>It serializes execution of message listeners registered with its message consumers.
56   * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
57   * </UL>
58   * <P>
59   * A session can create and service multiple message producers and consumers.
60   * <P>
61   * One typical use is to have a thread block on a synchronous <CODE>MessageConsumer</CODE> until a message arrives.
62   * The thread may then use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
63   * <P>
64   * If a client desires to have one thread produce messages while others consume them, the client should use a separate
65   * session for its producing thread.
66   * <P>
67   * Once a connection has been started, any session with one or more registered message listeners is dedicated to the
68   * thread of control that delivers messages to it. It is erroneous for client code to use this session or any of its
69   * constituent objects from another thread of control. The only exception to this rule is the use of the session or
70   * connection <CODE>close</CODE> method.
71   * <P>
72   * It should be easy for most clients to partition their work naturally into sessions. This model allows clients to
73   * start simply and incrementally add message processing complexity as their need for concurrency grows.
74   * <P>
75   * The <CODE>close</CODE> method is the only session method that can be called while some other session method is
76   * being executed in another thread.
77   * <P>
78   * A session may be specified as transacted. Each transacted session supports a single series of transactions. Each
79   * transaction groups a set of message sends and a set of message receives into an atomic unit of work. In effect,
80   * transactions organize a session's input message stream and output message stream into series of atomic units. When a
81   * transaction commits, its atomic unit of input is acknowledged and its associated atomic unit of output is sent. If a
82   * transaction rollback is done, the transaction's sent messages are destroyed and the session's input is automatically
83   * recovered.
84   * <P>
85   * The content of a transaction's input and output units is simply those messages that have been produced and consumed
86   * within the session's current transaction.
87   * <P>
88   * A transaction is completed using either its session's <CODE>commit</CODE> method or its session's <CODE>rollback
89   * </CODE> method. The completion of a session's current transaction automatically begins the next. The result is that
90   * a transacted session always has a current transaction within which its work is done.
91   * <P>
92   * The Java Transaction Service (JTS) or some other transaction monitor may be used to combine a session's transaction
93   * with transactions on other resources (databases, other JMS sessions, etc.). Since Java distributed transactions are
94   * controlled via the Java Transaction API (JTA), use of the session's <CODE>commit</CODE> and <CODE>rollback
95   * </CODE> methods in this context is prohibited.
96   * <P>
97   * The JMS API does not require support for JTA; however, it does define how a provider supplies this support.
98   * <P>
99   * Although it is also possible for a JMS client to handle distributed transactions directly, it is unlikely that many
100  * JMS clients will do this. Support for JTA in the JMS API is targeted at systems vendors who will be integrating the
101  * JMS API into their application server products.
102  *
103  * @version $Revision: 1.38 $
104  * @see javax.jms.Session
105  * @see javax.jms.QueueSession
106  * @see javax.jms.TopicSession
107  * @see javax.jms.XASession
108  */
109 public class ActiveMQSession
110         implements
111         Session,
112         QueueSession,
113         TopicSession,
114         ActiveMQMessageDispatcher,
115         MessageAcknowledge,
116         StatsCapable {
117     private static final Log log = LogFactory.getLog(ActiveMQSession.class);
118     protected ActiveMQConnection connection;
119     private int acknowledgeMode;
120     protected CopyOnWriteArrayList consumers;
121     protected CopyOnWriteArrayList producers;
122     private IdGenerator transactionIdGenerator;
123     private IdGenerator temporaryDestinationGenerator;
124     protected IdGenerator packetIdGenerator;
125     private IdGenerator producerIdGenerator;
126     private IdGenerator consumerIdGenerator;
127     private MessageListener messageListener;
128     protected SynchronizedBoolean closed;
129     private SynchronizedBoolean startTransaction;
130     private String sessionId;
131     protected String currentTransactionId;
132     private long startTime;
133     private Object deliveryMutex;
134     private LocalTransactionEventListener localTransactionEventListener;
135     /***
136      * deliveredMessages is only used if this Session is in Client Acknowledge mode
137      */
138     private LinkedList deliveredMessages;
139     /***
140      * Messages inbound for from a ConnectionConsumer
141      */
142     private LinkedList inboundMessages;
143     private JMSSessionStatsImpl stats;
144     //private Object counterLock = new Object();
145     //private int messageCount;
146     //private int autoAcknowledgeBatchSize = 1;
147 
148     /***
149      * Construct the Session
150      *
151      * @param theConnection
152      * @param theAcknowledgeMode n.b if transacted - the acknowledgeMode == Session.SESSION_TRANSACTED
153      * @throws JMSException on internal error
154      */
155     protected ActiveMQSession(ActiveMQConnection theConnection, int theAcknowledgeMode) throws JMSException {
156         this.connection = theConnection;
157         this.acknowledgeMode = theAcknowledgeMode;
158         this.consumers = new CopyOnWriteArrayList();
159         this.producers = new CopyOnWriteArrayList();
160         this.producerIdGenerator = new IdGenerator();
161         this.consumerIdGenerator = new IdGenerator();
162         this.transactionIdGenerator = new IdGenerator();
163         this.temporaryDestinationGenerator = new IdGenerator();
164         this.packetIdGenerator = new IdGenerator();
165         this.closed = new SynchronizedBoolean(false);
166         this.startTransaction = new SynchronizedBoolean(false);
167         this.sessionId = connection.generateSessionId();
168         this.startTime = System.currentTimeMillis();
169         this.deliveredMessages = new LinkedList();
170         this.inboundMessages = new LinkedList();
171         this.deliveryMutex = new Object();
172         if (getTransacted()) {
173             this.currentTransactionId = getNextTransactionId();
174         }
175         connection.addSession(this);
176         stats = new JMSSessionStatsImpl(producers, consumers);
177     }
178 
179     public Stats getStats() {
180         return stats;
181     }
182 
183     public JMSSessionStatsImpl getSessionStats() {
184         return stats;
185     }
186 
187     /***
188      * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> object is used to send a message
189      * containing a stream of uninterpreted bytes.
190      *
191      * @return the an ActiveMQBytesMessage
192      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
193      */
194     public BytesMessage createBytesMessage() throws JMSException {
195         checkClosed();
196         return new ActiveMQBytesMessage();
197     }
198 
199     /***
200      * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> object is used to send a self-defining
201      * set of name-value pairs, where names are <CODE>String</CODE> objects and values are primitive values in the
202      * Java programming language.
203      *
204      * @return an ActiveMQMapMessage
205      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
206      */
207     public MapMessage createMapMessage() throws JMSException {
208         checkClosed();
209         return new ActiveMQMapMessage();
210     }
211 
212     /***
213      * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> interface is the root interface of all JMS
214      * messages. A <CODE>Message</CODE> object holds all the standard message header information. It can be sent when
215      * a message containing only header information is sufficient.
216      *
217      * @return an ActiveMQMessage
218      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
219      */
220     public Message createMessage() throws JMSException {
221         checkClosed();
222         return new ActiveMQMessage();
223     }
224 
225     /***
226      * Creates an <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to send a message
227      * that contains a serializable Java object.
228      *
229      * @return an ActiveMQObjectMessage
230      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
231      */
232     public ObjectMessage createObjectMessage() throws JMSException {
233         checkClosed();
234         return new ActiveMQObjectMessage();
235     }
236 
237     /***
238      * Creates an initialized <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to
239      * send a message that contains a serializable Java object.
240      *
241      * @param object the object to use to initialize this message
242      * @return an ActiveMQObjectMessage
243      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
244      */
245     public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
246         checkClosed();
247         ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
248         msg.setObject(object);
249         return msg;
250     }
251 
252     /***
253      * Creates a <CODE>StreamMessage</CODE> object. A <CODE>StreamMessage</CODE> object is used to send a
254      * self-defining stream of primitive values in the Java programming language.
255      *
256      * @return an ActiveMQStreamMessage
257      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
258      */
259     public StreamMessage createStreamMessage() throws JMSException {
260         checkClosed();
261         return new ActiveMQStreamMessage();
262     }
263 
264     /***
265      * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a message
266      * containing a <CODE>String</CODE> object.
267      *
268      * @return an ActiveMQTextMessage
269      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
270      */
271     public TextMessage createTextMessage() throws JMSException {
272         checkClosed();
273         return new ActiveMQTextMessage();
274     }
275 
276     /***
277      * Creates an initialized <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a
278      * message containing a <CODE>String</CODE>.
279      *
280      * @param text the string used to initialize this message
281      * @return an ActiveMQTextMessage
282      * @throws JMSException if the JMS provider fails to create this message due to some internal error.
283      */
284     public TextMessage createTextMessage(String text) throws JMSException {
285         checkClosed();
286         ActiveMQTextMessage msg = new ActiveMQTextMessage();
287         msg.setText(text);
288         return msg;
289     }
290 
291     /***
292      * Indicates whether the session is in transacted mode.
293      *
294      * @return true if the session is in transacted mode
295      * @throws JMSException if there is some internal error.
296      */
297     public boolean getTransacted() throws JMSException {
298         checkClosed();
299         return this.acknowledgeMode == Session.SESSION_TRANSACTED;
300     }
301 
302     /***
303      * Returns the acknowledgement mode of the session. The acknowledgement mode is set at the time that the session is
304      * created. If the session is transacted, the acknowledgement mode is ignored.
305      *
306      * @return If the session is not transacted, returns the current acknowledgement mode for the session. If the
307      *         session is transacted, returns SESSION_TRANSACTED.
308      * @throws JMSException
309      * @see javax.jms.Connection#createSession(boolean,int)
310      * @since 1.1 exception JMSException if there is some internal error.
311      */
312     public int getAcknowledgeMode() throws JMSException {
313         checkClosed();
314         return this.acknowledgeMode;
315     }
316 
317     /***
318      * Commits all messages done in this transaction and releases any locks currently held.
319      *
320      * @throws JMSException                   if the JMS provider fails to commit the transaction due to some internal error.
321      * @throws TransactionRolledBackException if the transaction is rolled back due to some internal error during
322      *                                        commit.
323      * @throws javax.jms.IllegalStateException
324      *                                        if the method is not called by a transacted session.
325      */
326     public void commit() throws JMSException {
327         checkClosed();
328         if (!getTransacted()) {
329             throw new javax.jms.IllegalStateException("Not a transacted session");
330         }
331         // Only send commit if the transaction was started.
332         if (this.startTransaction.commit(true, false)) {
333             TransactionInfo info = new TransactionInfo();
334             info.setId(this.packetIdGenerator.generateId());
335             info.setTransactionId(currentTransactionId);
336             info.setType(TransactionInfo.COMMIT);
337             //before we send, update the current transaction id
338             this.currentTransactionId = getNextTransactionId();
339             // Notify the listener that the tx was commited back
340             this.connection.syncSendPacket(info);
341             if (localTransactionEventListener != null) {
342                 localTransactionEventListener.commitEvent();
343             }
344         }
345     }
346 
347     /***
348      * Rolls back any messages done in this transaction and releases any locks currently held.
349      *
350      * @throws JMSException if the JMS provider fails to roll back the transaction due to some internal error.
351      * @throws javax.jms.IllegalStateException
352      *                      if the method is not called by a transacted session.
353      */
354     public void rollback() throws JMSException {
355         checkClosed();
356         if (!getTransacted()) {
357             throw new javax.jms.IllegalStateException("Not a transacted session");
358         }
359         // Only rollback commit if the transaction was started.
360         if (this.startTransaction.commit(true, false)) {
361             TransactionInfo info = new TransactionInfo();
362             info.setId(this.packetIdGenerator.generateId());
363             info.setTransactionId(currentTransactionId);
364             info.setType(TransactionInfo.ROLLBACK);
365             //before we send, update the current transaction id
366             this.currentTransactionId = getNextTransactionId();
367             this.connection.asyncSendPacket(info);
368             // Notify the listener that the tx was rolled back
369             if (localTransactionEventListener != null) {
370                 localTransactionEventListener.rollbackEvent();
371             }
372         }
373     }
374 
375     /***
376      * Closes the session.
377      * <P>
378      * Since a provider may allocate some resources on behalf of a session outside the JVM, clients should close the
379      * resources when they are not needed. Relying on garbage collection to eventually reclaim these resources may not
380      * be timely enough.
381      * <P>
382      * There is no need to close the producers and consumers of a closed session.
383      * <P>
384      * This call will block until a <CODE>receive</CODE> call or message listener in progress has completed. A
385      * blocked message consumer <CODE>receive</CODE> call returns <CODE>null</CODE> when this session is closed.
386      * <P>
387      * Closing a transacted session must roll back the transaction in progress.
388      * <P>
389      * This method is the only <CODE>Session</CODE> method that can be called concurrently.
390      * <P>
391      * Invoking any other <CODE>Session</CODE> method on a closed session must throw a <CODE>
392      * JMSException.IllegalStateException</CODE>. Closing a closed session must <I>not </I> throw an exception.
393      *
394      * @throws JMSException if the JMS provider fails to close the session due to some internal error.
395      */
396     public void close() throws JMSException {
397         if (!this.closed.get()) {
398             for (Iterator i = consumers.iterator(); i.hasNext();) {
399                 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
400                 consumer.close();
401             }
402             for (Iterator i = producers.iterator(); i.hasNext();) {
403                 ActiveMQMessageProducer producer = (ActiveMQMessageProducer) i.next();
404                 producer.close();
405             }
406             consumers.clear();
407             producers.clear();
408             this.connection.removeSession(this);
409             inboundMessages.clear();
410             deliveredMessages.clear();
411             closed.set(true);
412         }
413     }
414 
415     /***
416      * @throws IllegalStateException if the Session is closed
417      */
418     protected void checkClosed() throws IllegalStateException {
419         if (this.closed.get()) {
420             throw new IllegalStateException("The Consumer is closed");
421         }
422     }
423 
424     /***
425      * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
426      * <P>
427      * All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges
428      * all messages that have been delivered to the client.
429      * <P>
430      * Restarting a session causes it to take the following actions:
431      * <UL>
432      * <LI>Stop message delivery
433      * <LI>Mark all messages that might have been delivered but not acknowledged as "redelivered"
434      * <LI>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
435      * Redelivered messages do not have to be delivered in exactly their original delivery order.
436      * </UL>
437      *
438      * @throws JMSException          if the JMS provider fails to stop and restart message delivery due to some internal error.
439      * @throws IllegalStateException if the method is called by a transacted session.
440      */
441     public void recover() throws JMSException {
442         checkClosed();
443         if (getTransacted()) {
444             throw new IllegalStateException("This session is transacted");
445         }
446         synchronized (deliveryMutex) {
447             HashSet replay = new LinkedHashSet();
448             replay.addAll(deliveredMessages);
449             replay.addAll(inboundMessages);
450             deliveredMessages.clear();
451             inboundMessages.clear();
452             for (Iterator i = replay.iterator(); i.hasNext();) {
453                 ActiveMQMessage msg = (ActiveMQMessage) i.next();
454                 inboundMessages.remove(msg);//else they will be added twice
455                 msg.setJMSRedelivered(true);
456                 dispatch(msg);
457             }
458             replay.clear();
459         }
460     }
461 
462     /***
463      * Returns the session's distinguished message listener (optional).
464      *
465      * @return the message listener associated with this session
466      * @throws JMSException if the JMS provider fails to get the message listener due to an internal error.
467      * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
468      * @see javax.jms.ServerSessionPool
469      * @see javax.jms.ServerSession
470      */
471     public MessageListener getMessageListener() throws JMSException {
472         checkClosed();
473         return this.messageListener;
474     }
475 
476     /***
477      * Sets the session's distinguished message listener (optional).
478      * <P>
479      * When the distinguished message listener is set, no other form of message receipt in the session can be used;
480      * however, all forms of sending messages are still supported.
481      * <P>
482      * This is an expert facility not used by regular JMS clients.
483      *
484      * @param listener the message listener to associate with this session
485      * @throws JMSException if the JMS provider fails to set the message listener due to an internal error.
486      * @see javax.jms.Session#getMessageListener()
487      * @see javax.jms.ServerSessionPool
488      * @see javax.jms.ServerSession
489      */
490     public void setMessageListener(MessageListener listener) throws JMSException {
491         checkClosed();
492         this.messageListener = listener;
493     }
494 
495     /***
496      * Optional operation, intended to be used only by Application Servers, not by ordinary JMS clients.
497      *
498      * @see javax.jms.ServerSession
499      */
500     public void run() {
501         MessageListener listener = this.messageListener;
502         //TODO this "doRemove" crap is an ugly hack to get rid of a concurrent modification exception.
503         boolean doRemove = this.acknowledgeMode != Session.CLIENT_ACKNOWLEDGE;
504         synchronized (inboundMessages) {
505             for (Iterator i = inboundMessages.iterator(); i.hasNext();) {
506                 ActiveMQMessage message = (ActiveMQMessage) i.next();
507                 if (listener != null) {
508                     try {
509                         listener.onMessage(message);
510                         this.messageDelivered(true, message, true, false);
511                     }
512                     catch (Throwable t) {
513                         log.info("Caught :" + t, t);
514                         this.messageDelivered(true, message, false, false);
515                     }
516                 }
517                 else {
518                     this.messageDelivered(true, message, false, false);
519                 }
520                 if (doRemove) {
521                     i.remove();
522                 }
523             }
524         }
525     }
526 
527     /***
528      * Creates a <CODE>MessageProducer</CODE> to send messages to the specified destination.
529      * <P>
530      * A client uses a <CODE>MessageProducer</CODE> object to send messages to a destination. Since <CODE>Queue
531      * </CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the
532      * destination parameter to create a <CODE>MessageProducer</CODE> object.
533      *
534      * @param destination the <CODE>Destination</CODE> to send to, or null if this is a producer which does not have
535      *                    a specified destination.
536      * @return the MessageProducer
537      * @throws JMSException                if the session fails to create a MessageProducer due to some internal error.
538      * @throws InvalidDestinationException if an invalid destination is specified.
539      * @since 1.1
540      */
541     public MessageProducer createProducer(Destination destination) throws JMSException {
542         checkClosed();
543         return new ActiveMQMessageProducer(this, ActiveMQMessageTransformation.transformDestination(destination));
544     }
545 
546     /***
547      * Creates a <CODE>MessageConsumer</CODE> for the specified destination. Since <CODE>Queue</CODE> and <CODE>
548      * Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the destination parameter to
549      * create a <CODE>MessageConsumer</CODE>.
550      *
551      * @param destination the <CODE>Destination</CODE> to access.
552      * @return the MessageConsumer
553      * @throws JMSException                if the session fails to create a consumer due to some internal error.
554      * @throws InvalidDestinationException if an invalid destination is specified.
555      * @since 1.1
556      */
557     public MessageConsumer createConsumer(Destination destination) throws JMSException {
558         checkClosed();
559         int prefetch = destination instanceof Topic ? connection.getPrefetchPolicy().getTopicPrefetch() : connection
560                 .getPrefetchPolicy().getQueuePrefetch();
561         return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
562                 "", this.connection.getNextConsumerNumber(), prefetch, false, false);
563     }
564 
565     /***
566      * Creates a <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. Since <CODE>
567      * Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the
568      * destination parameter to create a <CODE>MessageConsumer</CODE>.
569      * <P>
570      * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been sent to a destination.
571      *
572      * @param destination     the <CODE>Destination</CODE> to access
573      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
574      *                        value of null or an empty string indicates that there is no message selector for the message consumer.
575      * @return the MessageConsumer
576      * @throws JMSException                if the session fails to create a MessageConsumer due to some internal error.
577      * @throws InvalidDestinationException if an invalid destination is specified.
578      * @throws InvalidSelectorException    if the message selector is invalid.
579      * @since 1.1
580      */
581     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
582         checkClosed();
583         int prefetch = destination instanceof Topic ? connection.getPrefetchPolicy().getTopicPrefetch() : connection
584                 .getPrefetchPolicy().getQueuePrefetch();
585         return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
586                 messageSelector, this.connection.getNextConsumerNumber(), prefetch, false, false);
587     }
588 
589     /***
590      * Creates <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. This method can
591      * specify whether messages published by its own connection should be delivered to it, if the destination is a
592      * topic.
593      * <P>
594      * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be
595      * used in the destination parameter to create a <CODE>MessageConsumer</CODE>.
596      * <P>
597      * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been published to a
598      * destination.
599      * <P>
600      * In some cases, a connection may both publish and subscribe to a topic. The consumer <CODE>NoLocal</CODE>
601      * attribute allows a consumer to inhibit the delivery of messages published by its own connection. The default
602      * value for this attribute is False. The <CODE>noLocal</CODE> value must be supported by destinations that are
603      * topics.
604      *
605      * @param destination     the <CODE>Destination</CODE> to access
606      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
607      *                        value of null or an empty string indicates that there is no message selector for the message consumer.
608      * @param NoLocal         - if true, and the destination is a topic, inhibits the delivery of messages published by its own
609      *                        connection. The behavior for <CODE>NoLocal</CODE> is not specified if the destination is a queue.
610      * @return the MessageConsumer
611      * @throws JMSException                if the session fails to create a MessageConsumer due to some internal error.
612      * @throws InvalidDestinationException if an invalid destination is specified.
613      * @throws InvalidSelectorException    if the message selector is invalid.
614      * @since 1.1
615      */
616     public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal)
617             throws JMSException {
618         checkClosed();
619         int prefetch = connection.getPrefetchPolicy().getTopicPrefetch();
620         return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
621                 messageSelector, this.connection.getNextConsumerNumber(), prefetch, NoLocal, false);
622     }
623 
624     /***
625      * Creates a queue identity given a <CODE>Queue</CODE> name.
626      * <P>
627      * This facility is provided for the rare cases where clients need to dynamically manipulate queue identity. It
628      * allows the creation of a queue identity with a provider-specific name. Clients that depend on this ability are
629      * not portable.
630      * <P>
631      * Note that this method is not for creating the physical queue. The physical creation of queues is an
632      * administrative task and is not to be initiated by the JMS API. The one exception is the creation of temporary
633      * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> method.
634      *
635      * @param queueName the name of this <CODE>Queue</CODE>
636      * @return a <CODE>Queue</CODE> with the given name
637      * @throws JMSException if the session fails to create a queue due to some internal error.
638      * @since 1.1
639      */
640     public Queue createQueue(String queueName) throws JMSException {
641         checkClosed();
642         return new ActiveMQQueue(queueName);
643     }
644 
645     /***
646      * Creates a topic identity given a <CODE>Topic</CODE> name.
647      * <P>
648      * This facility is provided for the rare cases where clients need to dynamically manipulate topic identity. This
649      * allows the creation of a topic identity with a provider-specific name. Clients that depend on this ability are
650      * not portable.
651      * <P>
652      * Note that this method is not for creating the physical topic. The physical creation of topics is an
653      * administrative task and is not to be initiated by the JMS API. The one exception is the creation of temporary
654      * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> method.
655      *
656      * @param topicName the name of this <CODE>Topic</CODE>
657      * @return a <CODE>Topic</CODE> with the given name
658      * @throws JMSException if the session fails to create a topic due to some internal error.
659      * @since 1.1
660      */
661     public Topic createTopic(String topicName) throws JMSException {
662         checkClosed();
663         return new ActiveMQTopic(topicName);
664     }
665     /***
666      * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue.
667      *
668      * @param queue the <CODE>queue</CODE> to access
669      * @exception InvalidDestinationException if an invalid destination is specified
670      * @since 1.1
671      */
672     /***
673      * Creates a durable subscriber to the specified topic.
674      * <P>
675      * If a client needs to receive all the messages published on a topic, including the ones published while the
676      * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of
677      * this durable subscription and insures that all messages from the topic's publishers are retained until they are
678      * acknowledged by this durable subscriber or they have expired.
679      * <P>
680      * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must
681      * specify a name that uniquely identifies (within client identifier) each durable subscription it creates. Only
682      * one session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription.
683      * <P>
684      * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with
685      * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
686      * unsubscribing (deleting) the old one and creating a new one.
687      * <P>
688      * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
689      * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
690      * value for this attribute is false.
691      *
692      * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
693      * @param name  the name used to identify this subscription
694      * @return the TopicSubscriber
695      * @throws JMSException                if the session fails to create a subscriber due to some internal error.
696      * @throws InvalidDestinationException if an invalid topic is specified.
697      * @since 1.1
698      */
699     public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
700         checkClosed();
701         return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name, "",
702                 this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getDurableTopicPrefetch(),
703                 false, false);
704     }
705 
706     /***
707      * Creates a durable subscriber to the specified topic, using a message selector and specifying whether messages
708      * published by its own connection should be delivered to it.
709      * <P>
710      * If a client needs to receive all the messages published on a topic, including the ones published while the
711      * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of
712      * this durable subscription and insures that all messages from the topic's publishers are retained until they are
713      * acknowledged by this durable subscriber or they have expired.
714      * <P>
715      * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must
716      * specify a name which uniquely identifies (within client identifier) each durable subscription it creates. Only
717      * one session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
718      * inactive durable subscriber is one that exists but does not currently have a message consumer associated with
719      * it.
720      * <P>
721      * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with
722      * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
723      * unsubscribing (deleting) the old one and creating a new one.
724      *
725      * @param topic           the non-temporary <CODE>Topic</CODE> to subscribe to
726      * @param name            the name used to identify this subscription
727      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
728      *                        value of null or an empty string indicates that there is no message selector for the message consumer.
729      * @param noLocal         if set, inhibits the delivery of messages published by its own connection
730      * @return the Queue Browser
731      * @throws JMSException                if the session fails to create a subscriber due to some internal error.
732      * @throws InvalidDestinationException if an invalid topic is specified.
733      * @throws InvalidSelectorException    if the message selector is invalid.
734      * @since 1.1
735      */
736     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
737             throws JMSException {
738         checkClosed();
739         return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name,
740                 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
741                 .getDurableTopicPrefetch(), noLocal, false);
742     }
743 
744     /***
745      * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue.
746      *
747      * @param queue the <CODE>queue</CODE> to access
748      * @return the Queue Browser
749      * @throws JMSException                if the session fails to create a browser due to some internal error.
750      * @throws InvalidDestinationException if an invalid destination is specified
751      * @since 1.1
752      */
753     public QueueBrowser createBrowser(Queue queue) throws JMSException {
754         checkClosed();
755         return new ActiveMQQueueBrowser(this, ActiveMQMessageTransformation.transformDestination(queue), "",
756                 this.connection.getNextConsumerNumber());
757     }
758 
759     /***
760      * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue using a message
761      * selector.
762      *
763      * @param queue           the <CODE>queue</CODE> to access
764      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
765      *                        value of null or an empty string indicates that there is no message selector for the message consumer.
766      * @return the Queue Browser
767      * @throws JMSException                if the session fails to create a browser due to some internal error.
768      * @throws InvalidDestinationException if an invalid destination is specified
769      * @throws InvalidSelectorException    if the message selector is invalid.
770      * @since 1.1
771      */
772     public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
773         checkClosed();
774         return new ActiveMQQueueBrowser(this, ActiveMQMessageTransformation.transformDestination(queue),
775                 messageSelector, this.connection.getNextConsumerNumber());
776     }
777 
778     /***
779      * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE>
780      * unless it is deleted earlier.
781      *
782      * @return a temporary queue identity
783      * @throws JMSException if the session fails to create a temporary queue due to some internal error.
784      * @since 1.1
785      */
786     public TemporaryQueue createTemporaryQueue() throws JMSException {
787         checkClosed();
788         String tempQueueName = "TemporaryQueue-" + ActiveMQDestination.createTemporaryName(this.connection.getClientID());
789         tempQueueName += this.temporaryDestinationGenerator.generateId();
790         return new ActiveMQTemporaryQueue(tempQueueName);
791     }
792 
793     /***
794      * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE>
795      * unless it is deleted earlier.
796      *
797      * @return a temporary topic identity
798      * @throws JMSException if the session fails to create a temporary topic due to some internal error.
799      * @since 1.1
800      */
801     public TemporaryTopic createTemporaryTopic() throws JMSException {
802         checkClosed();
803         String tempTopicName = "TemporaryTopic-" + ActiveMQDestination.createTemporaryName(this.connection.getClientID());
804         tempTopicName += this.temporaryDestinationGenerator.generateId();
805         return new ActiveMQTemporaryTopic(tempTopicName);
806     }
807 
808     /***
809      * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue.
810      *
811      * @param queue the <CODE>Queue</CODE> to access
812      * @return @throws JMSException if the session fails to create a receiver due to some internal error.
813      * @throws JMSException
814      * @throws InvalidDestinationException if an invalid queue is specified.
815      */
816     public QueueReceiver createReceiver(Queue queue) throws JMSException {
817         checkClosed();
818         return new ActiveMQQueueReceiver(this, ActiveMQDestination.transformDestination(queue), "", this.connection
819                 .getNextConsumerNumber(), this.connection.getPrefetchPolicy().getQueuePrefetch());
820     }
821 
822     /***
823      * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue using a message
824      * selector.
825      *
826      * @param queue           the <CODE>Queue</CODE> to access
827      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
828      *                        value of null or an empty string indicates that there is no message selector for the message consumer.
829      * @return QueueReceiver
830      * @throws JMSException                if the session fails to create a receiver due to some internal error.
831      * @throws InvalidDestinationException if an invalid queue is specified.
832      * @throws InvalidSelectorException    if the message selector is invalid.
833      */
834     public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
835         checkClosed();
836         return new ActiveMQQueueReceiver(this, ActiveMQMessageTransformation.transformDestination(queue),
837                 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
838                 .getQueuePrefetch());
839     }
840 
841     /***
842      * Creates a <CODE>QueueSender</CODE> object to send messages to the specified queue.
843      *
844      * @param queue the <CODE>Queue</CODE> to access, or null if this is an unidentified producer
845      * @return QueueSender
846      * @throws JMSException                if the session fails to create a sender due to some internal error.
847      * @throws InvalidDestinationException if an invalid queue is specified.
848      */
849     public QueueSender createSender(Queue queue) throws JMSException {
850         checkClosed();
851         return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue));
852     }
853 
854     /***
855      * Creates a nondurable subscriber to the specified topic. <p/>
856      * <P>
857      * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic.
858      * <p/>
859      * <P>
860      * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published
861      * while they are active. <p/>
862      * <P>
863      * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
864      * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
865      * value for this attribute is false.
866      *
867      * @param topic the <CODE>Topic</CODE> to subscribe to
868      * @return TopicSubscriber
869      * @throws JMSException                if the session fails to create a subscriber due to some internal error.
870      * @throws InvalidDestinationException if an invalid topic is specified.
871      */
872     public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
873         checkClosed();
874         return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null, null,
875                 this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getTopicPrefetch(), false,
876                 false);
877     }
878 
879     /***
880      * Creates a nondurable subscriber to the specified topic, using a message selector or specifying whether messages
881      * published by its own connection should be delivered to it. <p/>
882      * <P>
883      * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic.
884      * <p/>
885      * <P>
886      * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published
887      * while they are active. <p/>
888      * <P>
889      * Messages filtered out by a subscriber's message selector will never be delivered to the subscriber. From the
890      * subscriber's perspective, they do not exist. <p/>
891      * <P>
892      * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
893      * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
894      * value for this attribute is false.
895      *
896      * @param topic           the <CODE>Topic</CODE> to subscribe to
897      * @param messageSelector only messages with properties matching the message selector expression are delivered. A
898      *                        value of null or an empty string indicates that there is no message selector for the message consumer.
899      * @param noLocal         if set, inhibits the delivery of messages published by its own connection
900      * @return TopicSubscriber
901      * @throws JMSException                if the session fails to create a subscriber due to some internal error.
902      * @throws InvalidDestinationException if an invalid topic is specified.
903      * @throws InvalidSelectorException    if the message selector is invalid.
904      */
905     public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
906         checkClosed();
907         return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null,
908                 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
909                 .getTopicPrefetch(), noLocal, false);
910     }
911 
912     /***
913      * Creates a publisher for the specified topic. <p/>
914      * <P>
915      * A client uses a <CODE>TopicPublisher</CODE> object to publish messages on a topic. Each time a client creates
916      * a <CODE>TopicPublisher</CODE> on a topic, it defines a new sequence of messages that have no ordering
917      * relationship with the messages it has previously sent.
918      *
919      * @param topic the <CODE>Topic</CODE> to publish to, or null if this is an unidentified producer
920      * @return TopicPublisher
921      * @throws JMSException                if the session fails to create a publisher due to some internal error.
922      * @throws InvalidDestinationException if an invalid topic is specified.
923      */
924     public TopicPublisher createPublisher(Topic topic) throws JMSException {
925         checkClosed();
926         return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic));
927     }
928 
929     /***
930      * Unsubscribes a durable subscription that has been created by a client.
931      * <P>
932      * This method deletes the state being maintained on behalf of the subscriber by its provider.
933      * <P>
934      * It is erroneous for a client to delete a durable subscription while there is an active <CODE>MessageConsumer
935      * </CODE> or <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed message is part of a pending
936      * transaction or has not been acknowledged in the session.
937      *
938      * @param name the name used to identify this subscription
939      * @throws JMSException                if the session fails to unsubscribe to the durable subscription due to some internal error.
940      * @throws InvalidDestinationException if an invalid subscription name is specified.
941      * @since 1.1
942      */
943     public void unsubscribe(String name) throws JMSException {
944         checkClosed();
945         DurableUnsubscribe ds = new DurableUnsubscribe();
946         ds.setId(this.packetIdGenerator.generateId());
947         ds.setClientId(this.connection.getClientID());
948         ds.setSubscriberName(name);
949         this.connection.syncSendPacket(ds);
950     }
951 
952     /***
953      * Tests to see if the Message Dispatcher is a target for this message
954      *
955      * @param message the message to test
956      * @return true if the Message Dispatcher can dispatch the message
957      */
958     public boolean isTarget(ActiveMQMessage message) {
959         for (Iterator i = this.consumers.iterator(); i.hasNext();) {
960             ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
961             if (message.isConsumerTarget(consumer.getConsumerNumber())) {
962                 return true;
963             }
964         }
965         return false;
966     }
967 
968     /***
969      * Dispatch an ActiveMQMessage
970      *
971      * @param message
972      */
973     public void dispatch(ActiveMQMessage message) {
974         message.setMessageAcknowledge(this);
975         synchronized (deliveryMutex) {
976             inboundMessages.add(message);
977             if (messageListener == null) {
978                 for (Iterator i = this.consumers.iterator(); i.hasNext();) {
979                     ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
980                     if (message.isConsumerTarget(consumer.getConsumerNumber())) {
981                         try {
982                             consumer.processMessage(message.shallowCopy());
983                         }
984                         catch (JMSException e) {
985                             connection.handleAsyncException(e);
986                         }
987                     }
988                 }
989             }
990         }
991     }
992 
993     /***
994      * Acknowledges all consumed messages of the session of this consumed message.
995      * <P>
996      * All consumed JMS messages support the <CODE>acknowledge</CODE> method for use when a client has specified that
997      * its JMS session's consumed messages are to be explicitly acknowledged. By invoking <CODE>acknowledge</CODE> on
998      * a consumed message, a client acknowledges all messages consumed by the session that the message was delivered
999      * to.
1000      * <P>
1001      * Calls to <CODE>acknowledge</CODE> are ignored for both transacted sessions and sessions specified to use
1002      * implicit acknowledgement modes.
1003      * <P>
1004      * A client may individually acknowledge each message as it is consumed, or it may choose to acknowledge messages
1005      * as an application-defined group (which is done by calling acknowledge on the last received message of the group,
1006      * thereby acknowledging all messages consumed by the session.)
1007      * <P>
1008      * Messages that have been received but not acknowledged may be redelivered.
1009      *
1010      * @throws JMSException if the JMS provider fails to acknowledge the messages due to some internal error.
1011      * @throws javax.jms.IllegalStateException
1012      *                      if this method is called on a closed session.
1013      * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1014      */
1015     public void acknowledge() throws JMSException {
1016         checkClosed();
1017         if (this.acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) {
1018             for (Iterator i = deliveredMessages.iterator(); i.hasNext();) {
1019                 ActiveMQMessage msg = (ActiveMQMessage) i.next();
1020                 MessageAck ack = new MessageAck();
1021                 ack.setConsumerId(msg.getConsumerId());
1022                 ack.setMessageID(msg.getJMSMessageID());
1023                 ack.setMessageRead(msg.isMessageConsumed());
1024                 ack.setId(packetIdGenerator.generateId());
1025                 this.connection.asyncSendPacket(ack);
1026                 this.inboundMessages.remove(msg);
1027             }
1028         }
1029         deliveredMessages.clear();
1030     }
1031 
1032     protected void messageDelivered(boolean sendAcknowledge, ActiveMQMessage message, boolean messageConsumed, boolean doRemove) {
1033         if (message != null && !closed.get()) {
1034             if (this.acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) {
1035                 message.setMessageConsumed(messageConsumed);
1036                 deliveredMessages.add(message);
1037             }
1038             else {
1039 
1040                 if (sendAcknowledge) {
1041                     try {
1042                         doStartTransaction();
1043                         MessageAck ack = new MessageAck();
1044                         ack.setConsumerId(message.getConsumerId());
1045                         ack.setTransactionId(this.currentTransactionId);
1046                         ack.setMessageID(message.getJMSMessageID());
1047                         ack.setMessageRead(messageConsumed);
1048                         ack.setId(packetIdGenerator.generateId());
1049                         ack.setXaTransacted(isXaTransacted());
1050 
1051                         this.connection.asyncSendPacket(ack);
1052                     }
1053                     catch (JMSException e) {
1054                         log.warn("failed to notify Broker that message is delivered", e);
1055                     }
1056                 }
1057                 if (doRemove) {
1058                     inboundMessages.remove(message);
1059                 }
1060             }
1061         }
1062     }
1063 
1064     /***
1065      * @param consumer
1066      * @throws JMSException
1067      */
1068     protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1069         // lets add the stat
1070         if (consumer.isDurableSubscriber()) {
1071             stats.onCreateDurableSubscriber();
1072         }
1073         consumer.setConsumerId(consumerIdGenerator.generateId());
1074         ConsumerInfo info = createConsumerInfo(consumer);
1075         info.setStarted(true);
1076         this.connection.syncSendPacket(info);
1077         this.consumers.add(consumer);
1078     }
1079 
1080     /***
1081      * @param consumer
1082      * @throws JMSException
1083      */
1084     protected void removeConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1085         this.consumers.remove(consumer);
1086 
1087         // lets remove the stat
1088         if (consumer.isDurableSubscriber()) {
1089             stats.onRemoveDurableSubscriber();
1090         }
1091         if (!closed.get()) {
1092             ConsumerInfo info = createConsumerInfo(consumer);
1093             info.setStarted(false);
1094             this.connection.asyncSendPacket(info);
1095         }
1096     }
1097 
1098     protected ConsumerInfo createConsumerInfo(ActiveMQMessageConsumer consumer) throws JMSException {
1099         ConsumerInfo info = new ConsumerInfo();
1100         info.setConsumerId(consumer.consumerId);
1101         info.setClientId(connection.clientID);
1102         info.setSessionId(this.sessionId);
1103         info.setConsumerNo(consumer.consumerNumber);
1104         info.setPrefetchNumber(consumer.prefetchNumber);
1105         info.setDestination(consumer.destination);
1106         info.setId(this.packetIdGenerator.generateId());
1107         info.setNoLocal(consumer.noLocal);
1108         info.setBrowser(consumer.browser);
1109         info.setSelector(consumer.messageSelector);
1110         info.setStartTime(consumer.startTime);
1111         info.setConsumerName(consumer.consumerName);
1112         return info;
1113     }
1114 
1115     /***
1116      * @param producer
1117      * @throws JMSException
1118      */
1119     protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1120         producer.setProducerId(producerIdGenerator.generateId());
1121         ProducerInfo info = createProducerInfo(producer);
1122         info.setStarted(true);
1123         this.connection.syncSendPacket(info);
1124         this.producers.add(producer);
1125     }
1126 
1127     /***
1128      * @param producer
1129      * @throws JMSException
1130      */
1131     protected void removeProducer(ActiveMQMessageProducer producer) throws JMSException {
1132         this.producers.remove(producer);
1133         if (!closed.get()) {
1134             ProducerInfo info = createProducerInfo(producer);
1135             info.setStarted(false);
1136             this.connection.asyncSendPacket(info);
1137         }
1138     }
1139 
1140     protected ProducerInfo createProducerInfo(ActiveMQMessageProducer producer) throws JMSException {
1141         ProducerInfo info = new ProducerInfo();
1142         info.setProducerId(producer.getProducerId());
1143         info.setClientId(connection.clientID);
1144         info.setSessionId(this.sessionId);
1145         info.setDestination(producer.defaultDestination);
1146         info.setId(this.packetIdGenerator.generateId());
1147         info.setStartTime(producer.getStartTime());
1148         return info;
1149     }
1150 
1151     /***
1152      * Start this Session
1153      */
1154     protected void start() {
1155     }
1156 
1157     /***
1158      * Stop this Session
1159      */
1160     protected void stop() {
1161     }
1162 
1163     /***
1164      * @return Returns the sessionId.
1165      */
1166     protected String getSessionId() {
1167         return sessionId;
1168     }
1169 
1170     /***
1171      * @param sessionId The sessionId to set.
1172      */
1173     protected void setSessionId(String sessionId) {
1174         this.sessionId = sessionId;
1175     }
1176 
1177     /***
1178      * @return Returns the startTime.
1179      */
1180     protected long getStartTime() {
1181         return startTime;
1182     }
1183 
1184     /***
1185      * @param startTime The startTime to set.
1186      */
1187     protected void setStartTime(long startTime) {
1188         this.startTime = startTime;
1189     }
1190 
1191     /***
1192      * send the message for dispatch by the broker
1193      *
1194      * @param producer
1195      * @param destination
1196      * @param message
1197      * @param deliveryMode
1198      * @param priority
1199      * @param timeToLive
1200      * @throws JMSException
1201      */
1202     protected void send(ActiveMQMessageProducer producer, Destination destination, Message message, int deliveryMode,
1203                         int priority, long timeToLive) throws JMSException {
1204         checkClosed();
1205         //tell the Broker we are about to start a new transaction
1206         doStartTransaction();
1207         message.setJMSDestination(destination);
1208         message.setJMSDeliveryMode(deliveryMode);
1209         message.setJMSPriority(priority);
1210         long expiration = 0L;
1211         if (!producer.getDisableMessageTimestamp()) {
1212             long timeStamp = System.currentTimeMillis();
1213             message.setJMSTimestamp(timeStamp);
1214             if (timeToLive > 0) {
1215                 expiration = timeToLive + timeStamp;
1216             }
1217         }
1218         message.setJMSExpiration(expiration);
1219         if (!producer.getDisableMessageID()) {
1220             message.setJMSMessageID(producer.getIdGenerator().generateId());
1221         }
1222         //transform to our own message format here
1223         ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message);
1224         msg.setProducerID(producer.getProducerId());
1225         msg.setTransactionId(currentTransactionId);
1226         msg.setXaTransacted(isXaTransacted());
1227         msg.setJMSClientID(this.connection.clientID);
1228         this.connection.asyncSendPacket(msg);
1229     }
1230 
1231     /***
1232      * Send TransactionInfo to indicate transaction has started
1233      *
1234      * @throws JMSException if some internal error occurs
1235      */
1236     protected void doStartTransaction() throws JMSException {
1237         if (getTransacted()) {
1238             if (startTransaction.commit(false, true)) {
1239                 TransactionInfo info = new TransactionInfo();
1240                 info.setId(this.packetIdGenerator.generateId());
1241                 info.setTransactionId(currentTransactionId);
1242                 info.setType(TransactionInfo.START);
1243                 this.connection.asyncSendPacket(info);
1244                 // Notify the listener that the tx was started.
1245                 if (localTransactionEventListener != null) {
1246                     localTransactionEventListener.beginEvent();
1247                 }
1248             }
1249         }
1250     }
1251 
1252     /***
1253      * @return Returns the localTransactionEventListener.
1254      */
1255     public LocalTransactionEventListener getLocalTransactionEventListener() {
1256         return localTransactionEventListener;
1257     }
1258 
1259     /***
1260      * Used by the resource adapter to listen to transaction events.
1261      *
1262      * @param localTransactionEventListener The localTransactionEventListener to set.
1263      */
1264     public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
1265         this.localTransactionEventListener = localTransactionEventListener;
1266     }
1267 
1268     protected boolean isXaTransacted() {
1269         return false;
1270     }
1271 
1272     //this has a misleading name, since in subclass ActiveMQXASession it is overridden to
1273     //return "currentTransactionId"
1274     protected String getNextTransactionId() {
1275         return this.transactionIdGenerator.generateId();
1276     }
1277 
1278 }