1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq;
19
20 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
21 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.codehaus.activemq.management.JMSSessionStatsImpl;
25 import org.codehaus.activemq.management.StatsCapable;
26 import org.codehaus.activemq.message.*;
27 import org.codehaus.activemq.ra.LocalTransactionEventListener;
28 import org.codehaus.activemq.util.IdGenerator;
29
30 import javax.jms.*;
31 import javax.jms.IllegalStateException;
32 import javax.management.j2ee.statistics.Stats;
33 import java.io.Serializable;
34 import java.util.HashSet;
35 import java.util.Iterator;
36 import java.util.LinkedHashSet;
37 import java.util.LinkedList;
38
39 /***
40 * <P>
41 * A <CODE>Session</CODE> object is a single-threaded context for producing and consuming messages. Although it may
42 * allocate provider resources outside the Java virtual machine (JVM), it is considered a lightweight JMS object.
43 * <P>
44 * A session serves several purposes:
45 * <UL>
46 * <LI>It is a factory for its message producers and consumers.
47 * <LI>It supplies provider-optimized message factories.
48 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and <CODE>TemporaryQueues</CODE>.
49 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> objects for those clients that need to
50 * dynamically manipulate provider-specific destination names.
51 * <LI>It supports a single series of transactions that combine work spanning its producers and consumers into atomic
52 * units.
53 * <LI>It defines a serial order for the messages it consumes and the messages it produces.
54 * <LI>It retains messages it consumes until they have been acknowledged.
55 * <LI>It serializes execution of message listeners registered with its message consumers.
56 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
57 * </UL>
58 * <P>
59 * A session can create and service multiple message producers and consumers.
60 * <P>
61 * One typical use is to have a thread block on a synchronous <CODE>MessageConsumer</CODE> until a message arrives.
62 * The thread may then use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
63 * <P>
64 * If a client desires to have one thread produce messages while others consume them, the client should use a separate
65 * session for its producing thread.
66 * <P>
67 * Once a connection has been started, any session with one or more registered message listeners is dedicated to the
68 * thread of control that delivers messages to it. It is erroneous for client code to use this session or any of its
69 * constituent objects from another thread of control. The only exception to this rule is the use of the session or
70 * connection <CODE>close</CODE> method.
71 * <P>
72 * It should be easy for most clients to partition their work naturally into sessions. This model allows clients to
73 * start simply and incrementally add message processing complexity as their need for concurrency grows.
74 * <P>
75 * The <CODE>close</CODE> method is the only session method that can be called while some other session method is
76 * being executed in another thread.
77 * <P>
78 * A session may be specified as transacted. Each transacted session supports a single series of transactions. Each
79 * transaction groups a set of message sends and a set of message receives into an atomic unit of work. In effect,
80 * transactions organize a session's input message stream and output message stream into series of atomic units. When a
81 * transaction commits, its atomic unit of input is acknowledged and its associated atomic unit of output is sent. If a
82 * transaction rollback is done, the transaction's sent messages are destroyed and the session's input is automatically
83 * recovered.
84 * <P>
85 * The content of a transaction's input and output units is simply those messages that have been produced and consumed
86 * within the session's current transaction.
87 * <P>
88 * A transaction is completed using either its session's <CODE>commit</CODE> method or its session's <CODE>rollback
89 * </CODE> method. The completion of a session's current transaction automatically begins the next. The result is that
90 * a transacted session always has a current transaction within which its work is done.
91 * <P>
92 * The Java Transaction Service (JTS) or some other transaction monitor may be used to combine a session's transaction
93 * with transactions on other resources (databases, other JMS sessions, etc.). Since Java distributed transactions are
94 * controlled via the Java Transaction API (JTA), use of the session's <CODE>commit</CODE> and <CODE>rollback
95 * </CODE> methods in this context is prohibited.
96 * <P>
97 * The JMS API does not require support for JTA; however, it does define how a provider supplies this support.
98 * <P>
99 * Although it is also possible for a JMS client to handle distributed transactions directly, it is unlikely that many
100 * JMS clients will do this. Support for JTA in the JMS API is targeted at systems vendors who will be integrating the
101 * JMS API into their application server products.
102 *
103 * @version $Revision: 1.38 $
104 * @see javax.jms.Session
105 * @see javax.jms.QueueSession
106 * @see javax.jms.TopicSession
107 * @see javax.jms.XASession
108 */
109 public class ActiveMQSession
110 implements
111 Session,
112 QueueSession,
113 TopicSession,
114 ActiveMQMessageDispatcher,
115 MessageAcknowledge,
116 StatsCapable {
117 private static final Log log = LogFactory.getLog(ActiveMQSession.class);
118 protected ActiveMQConnection connection;
119 private int acknowledgeMode;
120 protected CopyOnWriteArrayList consumers;
121 protected CopyOnWriteArrayList producers;
122 private IdGenerator transactionIdGenerator;
123 private IdGenerator temporaryDestinationGenerator;
124 protected IdGenerator packetIdGenerator;
125 private IdGenerator producerIdGenerator;
126 private IdGenerator consumerIdGenerator;
127 private MessageListener messageListener;
128 protected SynchronizedBoolean closed;
129 private SynchronizedBoolean startTransaction;
130 private String sessionId;
131 protected String currentTransactionId;
132 private long startTime;
133 private Object deliveryMutex;
134 private LocalTransactionEventListener localTransactionEventListener;
135 /***
136 * deliveredMessages is only used if this Session is in Client Acknowledge mode
137 */
138 private LinkedList deliveredMessages;
139 /***
140 * Messages inbound for from a ConnectionConsumer
141 */
142 private LinkedList inboundMessages;
143 private JMSSessionStatsImpl stats;
144
145
146
147
148 /***
149 * Construct the Session
150 *
151 * @param theConnection
152 * @param theAcknowledgeMode n.b if transacted - the acknowledgeMode == Session.SESSION_TRANSACTED
153 * @throws JMSException on internal error
154 */
155 protected ActiveMQSession(ActiveMQConnection theConnection, int theAcknowledgeMode) throws JMSException {
156 this.connection = theConnection;
157 this.acknowledgeMode = theAcknowledgeMode;
158 this.consumers = new CopyOnWriteArrayList();
159 this.producers = new CopyOnWriteArrayList();
160 this.producerIdGenerator = new IdGenerator();
161 this.consumerIdGenerator = new IdGenerator();
162 this.transactionIdGenerator = new IdGenerator();
163 this.temporaryDestinationGenerator = new IdGenerator();
164 this.packetIdGenerator = new IdGenerator();
165 this.closed = new SynchronizedBoolean(false);
166 this.startTransaction = new SynchronizedBoolean(false);
167 this.sessionId = connection.generateSessionId();
168 this.startTime = System.currentTimeMillis();
169 this.deliveredMessages = new LinkedList();
170 this.inboundMessages = new LinkedList();
171 this.deliveryMutex = new Object();
172 if (getTransacted()) {
173 this.currentTransactionId = getNextTransactionId();
174 }
175 connection.addSession(this);
176 stats = new JMSSessionStatsImpl(producers, consumers);
177 }
178
179 public Stats getStats() {
180 return stats;
181 }
182
183 public JMSSessionStatsImpl getSessionStats() {
184 return stats;
185 }
186
187 /***
188 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> object is used to send a message
189 * containing a stream of uninterpreted bytes.
190 *
191 * @return the an ActiveMQBytesMessage
192 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
193 */
194 public BytesMessage createBytesMessage() throws JMSException {
195 checkClosed();
196 return new ActiveMQBytesMessage();
197 }
198
199 /***
200 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> object is used to send a self-defining
201 * set of name-value pairs, where names are <CODE>String</CODE> objects and values are primitive values in the
202 * Java programming language.
203 *
204 * @return an ActiveMQMapMessage
205 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
206 */
207 public MapMessage createMapMessage() throws JMSException {
208 checkClosed();
209 return new ActiveMQMapMessage();
210 }
211
212 /***
213 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> interface is the root interface of all JMS
214 * messages. A <CODE>Message</CODE> object holds all the standard message header information. It can be sent when
215 * a message containing only header information is sufficient.
216 *
217 * @return an ActiveMQMessage
218 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
219 */
220 public Message createMessage() throws JMSException {
221 checkClosed();
222 return new ActiveMQMessage();
223 }
224
225 /***
226 * Creates an <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to send a message
227 * that contains a serializable Java object.
228 *
229 * @return an ActiveMQObjectMessage
230 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
231 */
232 public ObjectMessage createObjectMessage() throws JMSException {
233 checkClosed();
234 return new ActiveMQObjectMessage();
235 }
236
237 /***
238 * Creates an initialized <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to
239 * send a message that contains a serializable Java object.
240 *
241 * @param object the object to use to initialize this message
242 * @return an ActiveMQObjectMessage
243 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
244 */
245 public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
246 checkClosed();
247 ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
248 msg.setObject(object);
249 return msg;
250 }
251
252 /***
253 * Creates a <CODE>StreamMessage</CODE> object. A <CODE>StreamMessage</CODE> object is used to send a
254 * self-defining stream of primitive values in the Java programming language.
255 *
256 * @return an ActiveMQStreamMessage
257 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
258 */
259 public StreamMessage createStreamMessage() throws JMSException {
260 checkClosed();
261 return new ActiveMQStreamMessage();
262 }
263
264 /***
265 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a message
266 * containing a <CODE>String</CODE> object.
267 *
268 * @return an ActiveMQTextMessage
269 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
270 */
271 public TextMessage createTextMessage() throws JMSException {
272 checkClosed();
273 return new ActiveMQTextMessage();
274 }
275
276 /***
277 * Creates an initialized <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a
278 * message containing a <CODE>String</CODE>.
279 *
280 * @param text the string used to initialize this message
281 * @return an ActiveMQTextMessage
282 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
283 */
284 public TextMessage createTextMessage(String text) throws JMSException {
285 checkClosed();
286 ActiveMQTextMessage msg = new ActiveMQTextMessage();
287 msg.setText(text);
288 return msg;
289 }
290
291 /***
292 * Indicates whether the session is in transacted mode.
293 *
294 * @return true if the session is in transacted mode
295 * @throws JMSException if there is some internal error.
296 */
297 public boolean getTransacted() throws JMSException {
298 checkClosed();
299 return this.acknowledgeMode == Session.SESSION_TRANSACTED;
300 }
301
302 /***
303 * Returns the acknowledgement mode of the session. The acknowledgement mode is set at the time that the session is
304 * created. If the session is transacted, the acknowledgement mode is ignored.
305 *
306 * @return If the session is not transacted, returns the current acknowledgement mode for the session. If the
307 * session is transacted, returns SESSION_TRANSACTED.
308 * @throws JMSException
309 * @see javax.jms.Connection#createSession(boolean,int)
310 * @since 1.1 exception JMSException if there is some internal error.
311 */
312 public int getAcknowledgeMode() throws JMSException {
313 checkClosed();
314 return this.acknowledgeMode;
315 }
316
317 /***
318 * Commits all messages done in this transaction and releases any locks currently held.
319 *
320 * @throws JMSException if the JMS provider fails to commit the transaction due to some internal error.
321 * @throws TransactionRolledBackException if the transaction is rolled back due to some internal error during
322 * commit.
323 * @throws javax.jms.IllegalStateException
324 * if the method is not called by a transacted session.
325 */
326 public void commit() throws JMSException {
327 checkClosed();
328 if (!getTransacted()) {
329 throw new javax.jms.IllegalStateException("Not a transacted session");
330 }
331
332 if (this.startTransaction.commit(true, false)) {
333 TransactionInfo info = new TransactionInfo();
334 info.setId(this.packetIdGenerator.generateId());
335 info.setTransactionId(currentTransactionId);
336 info.setType(TransactionInfo.COMMIT);
337
338 this.currentTransactionId = getNextTransactionId();
339
340 this.connection.syncSendPacket(info);
341 if (localTransactionEventListener != null) {
342 localTransactionEventListener.commitEvent();
343 }
344 }
345 }
346
347 /***
348 * Rolls back any messages done in this transaction and releases any locks currently held.
349 *
350 * @throws JMSException if the JMS provider fails to roll back the transaction due to some internal error.
351 * @throws javax.jms.IllegalStateException
352 * if the method is not called by a transacted session.
353 */
354 public void rollback() throws JMSException {
355 checkClosed();
356 if (!getTransacted()) {
357 throw new javax.jms.IllegalStateException("Not a transacted session");
358 }
359
360 if (this.startTransaction.commit(true, false)) {
361 TransactionInfo info = new TransactionInfo();
362 info.setId(this.packetIdGenerator.generateId());
363 info.setTransactionId(currentTransactionId);
364 info.setType(TransactionInfo.ROLLBACK);
365
366 this.currentTransactionId = getNextTransactionId();
367 this.connection.asyncSendPacket(info);
368
369 if (localTransactionEventListener != null) {
370 localTransactionEventListener.rollbackEvent();
371 }
372 }
373 }
374
375 /***
376 * Closes the session.
377 * <P>
378 * Since a provider may allocate some resources on behalf of a session outside the JVM, clients should close the
379 * resources when they are not needed. Relying on garbage collection to eventually reclaim these resources may not
380 * be timely enough.
381 * <P>
382 * There is no need to close the producers and consumers of a closed session.
383 * <P>
384 * This call will block until a <CODE>receive</CODE> call or message listener in progress has completed. A
385 * blocked message consumer <CODE>receive</CODE> call returns <CODE>null</CODE> when this session is closed.
386 * <P>
387 * Closing a transacted session must roll back the transaction in progress.
388 * <P>
389 * This method is the only <CODE>Session</CODE> method that can be called concurrently.
390 * <P>
391 * Invoking any other <CODE>Session</CODE> method on a closed session must throw a <CODE>
392 * JMSException.IllegalStateException</CODE>. Closing a closed session must <I>not </I> throw an exception.
393 *
394 * @throws JMSException if the JMS provider fails to close the session due to some internal error.
395 */
396 public void close() throws JMSException {
397 if (!this.closed.get()) {
398 for (Iterator i = consumers.iterator(); i.hasNext();) {
399 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
400 consumer.close();
401 }
402 for (Iterator i = producers.iterator(); i.hasNext();) {
403 ActiveMQMessageProducer producer = (ActiveMQMessageProducer) i.next();
404 producer.close();
405 }
406 consumers.clear();
407 producers.clear();
408 this.connection.removeSession(this);
409 inboundMessages.clear();
410 deliveredMessages.clear();
411 closed.set(true);
412 }
413 }
414
415 /***
416 * @throws IllegalStateException if the Session is closed
417 */
418 protected void checkClosed() throws IllegalStateException {
419 if (this.closed.get()) {
420 throw new IllegalStateException("The Consumer is closed");
421 }
422 }
423
424 /***
425 * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
426 * <P>
427 * All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges
428 * all messages that have been delivered to the client.
429 * <P>
430 * Restarting a session causes it to take the following actions:
431 * <UL>
432 * <LI>Stop message delivery
433 * <LI>Mark all messages that might have been delivered but not acknowledged as "redelivered"
434 * <LI>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
435 * Redelivered messages do not have to be delivered in exactly their original delivery order.
436 * </UL>
437 *
438 * @throws JMSException if the JMS provider fails to stop and restart message delivery due to some internal error.
439 * @throws IllegalStateException if the method is called by a transacted session.
440 */
441 public void recover() throws JMSException {
442 checkClosed();
443 if (getTransacted()) {
444 throw new IllegalStateException("This session is transacted");
445 }
446 synchronized (deliveryMutex) {
447 HashSet replay = new LinkedHashSet();
448 replay.addAll(deliveredMessages);
449 replay.addAll(inboundMessages);
450 deliveredMessages.clear();
451 inboundMessages.clear();
452 for (Iterator i = replay.iterator(); i.hasNext();) {
453 ActiveMQMessage msg = (ActiveMQMessage) i.next();
454 inboundMessages.remove(msg);
455 msg.setJMSRedelivered(true);
456 dispatch(msg);
457 }
458 replay.clear();
459 }
460 }
461
462 /***
463 * Returns the session's distinguished message listener (optional).
464 *
465 * @return the message listener associated with this session
466 * @throws JMSException if the JMS provider fails to get the message listener due to an internal error.
467 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
468 * @see javax.jms.ServerSessionPool
469 * @see javax.jms.ServerSession
470 */
471 public MessageListener getMessageListener() throws JMSException {
472 checkClosed();
473 return this.messageListener;
474 }
475
476 /***
477 * Sets the session's distinguished message listener (optional).
478 * <P>
479 * When the distinguished message listener is set, no other form of message receipt in the session can be used;
480 * however, all forms of sending messages are still supported.
481 * <P>
482 * This is an expert facility not used by regular JMS clients.
483 *
484 * @param listener the message listener to associate with this session
485 * @throws JMSException if the JMS provider fails to set the message listener due to an internal error.
486 * @see javax.jms.Session#getMessageListener()
487 * @see javax.jms.ServerSessionPool
488 * @see javax.jms.ServerSession
489 */
490 public void setMessageListener(MessageListener listener) throws JMSException {
491 checkClosed();
492 this.messageListener = listener;
493 }
494
495 /***
496 * Optional operation, intended to be used only by Application Servers, not by ordinary JMS clients.
497 *
498 * @see javax.jms.ServerSession
499 */
500 public void run() {
501 MessageListener listener = this.messageListener;
502
503 boolean doRemove = this.acknowledgeMode != Session.CLIENT_ACKNOWLEDGE;
504 synchronized (inboundMessages) {
505 for (Iterator i = inboundMessages.iterator(); i.hasNext();) {
506 ActiveMQMessage message = (ActiveMQMessage) i.next();
507 if (listener != null) {
508 try {
509 listener.onMessage(message);
510 this.messageDelivered(true, message, true, false);
511 }
512 catch (Throwable t) {
513 log.info("Caught :" + t, t);
514 this.messageDelivered(true, message, false, false);
515 }
516 }
517 else {
518 this.messageDelivered(true, message, false, false);
519 }
520 if (doRemove) {
521 i.remove();
522 }
523 }
524 }
525 }
526
527 /***
528 * Creates a <CODE>MessageProducer</CODE> to send messages to the specified destination.
529 * <P>
530 * A client uses a <CODE>MessageProducer</CODE> object to send messages to a destination. Since <CODE>Queue
531 * </CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the
532 * destination parameter to create a <CODE>MessageProducer</CODE> object.
533 *
534 * @param destination the <CODE>Destination</CODE> to send to, or null if this is a producer which does not have
535 * a specified destination.
536 * @return the MessageProducer
537 * @throws JMSException if the session fails to create a MessageProducer due to some internal error.
538 * @throws InvalidDestinationException if an invalid destination is specified.
539 * @since 1.1
540 */
541 public MessageProducer createProducer(Destination destination) throws JMSException {
542 checkClosed();
543 return new ActiveMQMessageProducer(this, ActiveMQMessageTransformation.transformDestination(destination));
544 }
545
546 /***
547 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. Since <CODE>Queue</CODE> and <CODE>
548 * Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the destination parameter to
549 * create a <CODE>MessageConsumer</CODE>.
550 *
551 * @param destination the <CODE>Destination</CODE> to access.
552 * @return the MessageConsumer
553 * @throws JMSException if the session fails to create a consumer due to some internal error.
554 * @throws InvalidDestinationException if an invalid destination is specified.
555 * @since 1.1
556 */
557 public MessageConsumer createConsumer(Destination destination) throws JMSException {
558 checkClosed();
559 int prefetch = destination instanceof Topic ? connection.getPrefetchPolicy().getTopicPrefetch() : connection
560 .getPrefetchPolicy().getQueuePrefetch();
561 return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
562 "", this.connection.getNextConsumerNumber(), prefetch, false, false);
563 }
564
565 /***
566 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. Since <CODE>
567 * Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the
568 * destination parameter to create a <CODE>MessageConsumer</CODE>.
569 * <P>
570 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been sent to a destination.
571 *
572 * @param destination the <CODE>Destination</CODE> to access
573 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
574 * value of null or an empty string indicates that there is no message selector for the message consumer.
575 * @return the MessageConsumer
576 * @throws JMSException if the session fails to create a MessageConsumer due to some internal error.
577 * @throws InvalidDestinationException if an invalid destination is specified.
578 * @throws InvalidSelectorException if the message selector is invalid.
579 * @since 1.1
580 */
581 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
582 checkClosed();
583 int prefetch = destination instanceof Topic ? connection.getPrefetchPolicy().getTopicPrefetch() : connection
584 .getPrefetchPolicy().getQueuePrefetch();
585 return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
586 messageSelector, this.connection.getNextConsumerNumber(), prefetch, false, false);
587 }
588
589 /***
590 * Creates <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. This method can
591 * specify whether messages published by its own connection should be delivered to it, if the destination is a
592 * topic.
593 * <P>
594 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be
595 * used in the destination parameter to create a <CODE>MessageConsumer</CODE>.
596 * <P>
597 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been published to a
598 * destination.
599 * <P>
600 * In some cases, a connection may both publish and subscribe to a topic. The consumer <CODE>NoLocal</CODE>
601 * attribute allows a consumer to inhibit the delivery of messages published by its own connection. The default
602 * value for this attribute is False. The <CODE>noLocal</CODE> value must be supported by destinations that are
603 * topics.
604 *
605 * @param destination the <CODE>Destination</CODE> to access
606 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
607 * value of null or an empty string indicates that there is no message selector for the message consumer.
608 * @param NoLocal - if true, and the destination is a topic, inhibits the delivery of messages published by its own
609 * connection. The behavior for <CODE>NoLocal</CODE> is not specified if the destination is a queue.
610 * @return the MessageConsumer
611 * @throws JMSException if the session fails to create a MessageConsumer due to some internal error.
612 * @throws InvalidDestinationException if an invalid destination is specified.
613 * @throws InvalidSelectorException if the message selector is invalid.
614 * @since 1.1
615 */
616 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal)
617 throws JMSException {
618 checkClosed();
619 int prefetch = connection.getPrefetchPolicy().getTopicPrefetch();
620 return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
621 messageSelector, this.connection.getNextConsumerNumber(), prefetch, NoLocal, false);
622 }
623
624 /***
625 * Creates a queue identity given a <CODE>Queue</CODE> name.
626 * <P>
627 * This facility is provided for the rare cases where clients need to dynamically manipulate queue identity. It
628 * allows the creation of a queue identity with a provider-specific name. Clients that depend on this ability are
629 * not portable.
630 * <P>
631 * Note that this method is not for creating the physical queue. The physical creation of queues is an
632 * administrative task and is not to be initiated by the JMS API. The one exception is the creation of temporary
633 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> method.
634 *
635 * @param queueName the name of this <CODE>Queue</CODE>
636 * @return a <CODE>Queue</CODE> with the given name
637 * @throws JMSException if the session fails to create a queue due to some internal error.
638 * @since 1.1
639 */
640 public Queue createQueue(String queueName) throws JMSException {
641 checkClosed();
642 return new ActiveMQQueue(queueName);
643 }
644
645 /***
646 * Creates a topic identity given a <CODE>Topic</CODE> name.
647 * <P>
648 * This facility is provided for the rare cases where clients need to dynamically manipulate topic identity. This
649 * allows the creation of a topic identity with a provider-specific name. Clients that depend on this ability are
650 * not portable.
651 * <P>
652 * Note that this method is not for creating the physical topic. The physical creation of topics is an
653 * administrative task and is not to be initiated by the JMS API. The one exception is the creation of temporary
654 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> method.
655 *
656 * @param topicName the name of this <CODE>Topic</CODE>
657 * @return a <CODE>Topic</CODE> with the given name
658 * @throws JMSException if the session fails to create a topic due to some internal error.
659 * @since 1.1
660 */
661 public Topic createTopic(String topicName) throws JMSException {
662 checkClosed();
663 return new ActiveMQTopic(topicName);
664 }
665 /***
666 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue.
667 *
668 * @param queue the <CODE>queue</CODE> to access
669 * @exception InvalidDestinationException if an invalid destination is specified
670 * @since 1.1
671 */
672 /***
673 * Creates a durable subscriber to the specified topic.
674 * <P>
675 * If a client needs to receive all the messages published on a topic, including the ones published while the
676 * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of
677 * this durable subscription and insures that all messages from the topic's publishers are retained until they are
678 * acknowledged by this durable subscriber or they have expired.
679 * <P>
680 * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must
681 * specify a name that uniquely identifies (within client identifier) each durable subscription it creates. Only
682 * one session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription.
683 * <P>
684 * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with
685 * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
686 * unsubscribing (deleting) the old one and creating a new one.
687 * <P>
688 * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
689 * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
690 * value for this attribute is false.
691 *
692 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
693 * @param name the name used to identify this subscription
694 * @return the TopicSubscriber
695 * @throws JMSException if the session fails to create a subscriber due to some internal error.
696 * @throws InvalidDestinationException if an invalid topic is specified.
697 * @since 1.1
698 */
699 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
700 checkClosed();
701 return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name, "",
702 this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getDurableTopicPrefetch(),
703 false, false);
704 }
705
706 /***
707 * Creates a durable subscriber to the specified topic, using a message selector and specifying whether messages
708 * published by its own connection should be delivered to it.
709 * <P>
710 * If a client needs to receive all the messages published on a topic, including the ones published while the
711 * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of
712 * this durable subscription and insures that all messages from the topic's publishers are retained until they are
713 * acknowledged by this durable subscriber or they have expired.
714 * <P>
715 * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must
716 * specify a name which uniquely identifies (within client identifier) each durable subscription it creates. Only
717 * one session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
718 * inactive durable subscriber is one that exists but does not currently have a message consumer associated with
719 * it.
720 * <P>
721 * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with
722 * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
723 * unsubscribing (deleting) the old one and creating a new one.
724 *
725 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
726 * @param name the name used to identify this subscription
727 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
728 * value of null or an empty string indicates that there is no message selector for the message consumer.
729 * @param noLocal if set, inhibits the delivery of messages published by its own connection
730 * @return the Queue Browser
731 * @throws JMSException if the session fails to create a subscriber due to some internal error.
732 * @throws InvalidDestinationException if an invalid topic is specified.
733 * @throws InvalidSelectorException if the message selector is invalid.
734 * @since 1.1
735 */
736 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
737 throws JMSException {
738 checkClosed();
739 return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name,
740 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
741 .getDurableTopicPrefetch(), noLocal, false);
742 }
743
744 /***
745 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue.
746 *
747 * @param queue the <CODE>queue</CODE> to access
748 * @return the Queue Browser
749 * @throws JMSException if the session fails to create a browser due to some internal error.
750 * @throws InvalidDestinationException if an invalid destination is specified
751 * @since 1.1
752 */
753 public QueueBrowser createBrowser(Queue queue) throws JMSException {
754 checkClosed();
755 return new ActiveMQQueueBrowser(this, ActiveMQMessageTransformation.transformDestination(queue), "",
756 this.connection.getNextConsumerNumber());
757 }
758
759 /***
760 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue using a message
761 * selector.
762 *
763 * @param queue the <CODE>queue</CODE> to access
764 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
765 * value of null or an empty string indicates that there is no message selector for the message consumer.
766 * @return the Queue Browser
767 * @throws JMSException if the session fails to create a browser due to some internal error.
768 * @throws InvalidDestinationException if an invalid destination is specified
769 * @throws InvalidSelectorException if the message selector is invalid.
770 * @since 1.1
771 */
772 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
773 checkClosed();
774 return new ActiveMQQueueBrowser(this, ActiveMQMessageTransformation.transformDestination(queue),
775 messageSelector, this.connection.getNextConsumerNumber());
776 }
777
778 /***
779 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE>
780 * unless it is deleted earlier.
781 *
782 * @return a temporary queue identity
783 * @throws JMSException if the session fails to create a temporary queue due to some internal error.
784 * @since 1.1
785 */
786 public TemporaryQueue createTemporaryQueue() throws JMSException {
787 checkClosed();
788 String tempQueueName = "TemporaryQueue-" + ActiveMQDestination.createTemporaryName(this.connection.getClientID());
789 tempQueueName += this.temporaryDestinationGenerator.generateId();
790 return new ActiveMQTemporaryQueue(tempQueueName);
791 }
792
793 /***
794 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE>
795 * unless it is deleted earlier.
796 *
797 * @return a temporary topic identity
798 * @throws JMSException if the session fails to create a temporary topic due to some internal error.
799 * @since 1.1
800 */
801 public TemporaryTopic createTemporaryTopic() throws JMSException {
802 checkClosed();
803 String tempTopicName = "TemporaryTopic-" + ActiveMQDestination.createTemporaryName(this.connection.getClientID());
804 tempTopicName += this.temporaryDestinationGenerator.generateId();
805 return new ActiveMQTemporaryTopic(tempTopicName);
806 }
807
808 /***
809 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue.
810 *
811 * @param queue the <CODE>Queue</CODE> to access
812 * @return @throws JMSException if the session fails to create a receiver due to some internal error.
813 * @throws JMSException
814 * @throws InvalidDestinationException if an invalid queue is specified.
815 */
816 public QueueReceiver createReceiver(Queue queue) throws JMSException {
817 checkClosed();
818 return new ActiveMQQueueReceiver(this, ActiveMQDestination.transformDestination(queue), "", this.connection
819 .getNextConsumerNumber(), this.connection.getPrefetchPolicy().getQueuePrefetch());
820 }
821
822 /***
823 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue using a message
824 * selector.
825 *
826 * @param queue the <CODE>Queue</CODE> to access
827 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
828 * value of null or an empty string indicates that there is no message selector for the message consumer.
829 * @return QueueReceiver
830 * @throws JMSException if the session fails to create a receiver due to some internal error.
831 * @throws InvalidDestinationException if an invalid queue is specified.
832 * @throws InvalidSelectorException if the message selector is invalid.
833 */
834 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
835 checkClosed();
836 return new ActiveMQQueueReceiver(this, ActiveMQMessageTransformation.transformDestination(queue),
837 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
838 .getQueuePrefetch());
839 }
840
841 /***
842 * Creates a <CODE>QueueSender</CODE> object to send messages to the specified queue.
843 *
844 * @param queue the <CODE>Queue</CODE> to access, or null if this is an unidentified producer
845 * @return QueueSender
846 * @throws JMSException if the session fails to create a sender due to some internal error.
847 * @throws InvalidDestinationException if an invalid queue is specified.
848 */
849 public QueueSender createSender(Queue queue) throws JMSException {
850 checkClosed();
851 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue));
852 }
853
854 /***
855 * Creates a nondurable subscriber to the specified topic. <p/>
856 * <P>
857 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic.
858 * <p/>
859 * <P>
860 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published
861 * while they are active. <p/>
862 * <P>
863 * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
864 * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
865 * value for this attribute is false.
866 *
867 * @param topic the <CODE>Topic</CODE> to subscribe to
868 * @return TopicSubscriber
869 * @throws JMSException if the session fails to create a subscriber due to some internal error.
870 * @throws InvalidDestinationException if an invalid topic is specified.
871 */
872 public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
873 checkClosed();
874 return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null, null,
875 this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getTopicPrefetch(), false,
876 false);
877 }
878
879 /***
880 * Creates a nondurable subscriber to the specified topic, using a message selector or specifying whether messages
881 * published by its own connection should be delivered to it. <p/>
882 * <P>
883 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic.
884 * <p/>
885 * <P>
886 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published
887 * while they are active. <p/>
888 * <P>
889 * Messages filtered out by a subscriber's message selector will never be delivered to the subscriber. From the
890 * subscriber's perspective, they do not exist. <p/>
891 * <P>
892 * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
893 * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
894 * value for this attribute is false.
895 *
896 * @param topic the <CODE>Topic</CODE> to subscribe to
897 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
898 * value of null or an empty string indicates that there is no message selector for the message consumer.
899 * @param noLocal if set, inhibits the delivery of messages published by its own connection
900 * @return TopicSubscriber
901 * @throws JMSException if the session fails to create a subscriber due to some internal error.
902 * @throws InvalidDestinationException if an invalid topic is specified.
903 * @throws InvalidSelectorException if the message selector is invalid.
904 */
905 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
906 checkClosed();
907 return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null,
908 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
909 .getTopicPrefetch(), noLocal, false);
910 }
911
912 /***
913 * Creates a publisher for the specified topic. <p/>
914 * <P>
915 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages on a topic. Each time a client creates
916 * a <CODE>TopicPublisher</CODE> on a topic, it defines a new sequence of messages that have no ordering
917 * relationship with the messages it has previously sent.
918 *
919 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is an unidentified producer
920 * @return TopicPublisher
921 * @throws JMSException if the session fails to create a publisher due to some internal error.
922 * @throws InvalidDestinationException if an invalid topic is specified.
923 */
924 public TopicPublisher createPublisher(Topic topic) throws JMSException {
925 checkClosed();
926 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic));
927 }
928
929 /***
930 * Unsubscribes a durable subscription that has been created by a client.
931 * <P>
932 * This method deletes the state being maintained on behalf of the subscriber by its provider.
933 * <P>
934 * It is erroneous for a client to delete a durable subscription while there is an active <CODE>MessageConsumer
935 * </CODE> or <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed message is part of a pending
936 * transaction or has not been acknowledged in the session.
937 *
938 * @param name the name used to identify this subscription
939 * @throws JMSException if the session fails to unsubscribe to the durable subscription due to some internal error.
940 * @throws InvalidDestinationException if an invalid subscription name is specified.
941 * @since 1.1
942 */
943 public void unsubscribe(String name) throws JMSException {
944 checkClosed();
945 DurableUnsubscribe ds = new DurableUnsubscribe();
946 ds.setId(this.packetIdGenerator.generateId());
947 ds.setClientId(this.connection.getClientID());
948 ds.setSubscriberName(name);
949 this.connection.syncSendPacket(ds);
950 }
951
952 /***
953 * Tests to see if the Message Dispatcher is a target for this message
954 *
955 * @param message the message to test
956 * @return true if the Message Dispatcher can dispatch the message
957 */
958 public boolean isTarget(ActiveMQMessage message) {
959 for (Iterator i = this.consumers.iterator(); i.hasNext();) {
960 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
961 if (message.isConsumerTarget(consumer.getConsumerNumber())) {
962 return true;
963 }
964 }
965 return false;
966 }
967
968 /***
969 * Dispatch an ActiveMQMessage
970 *
971 * @param message
972 */
973 public void dispatch(ActiveMQMessage message) {
974 message.setMessageAcknowledge(this);
975 synchronized (deliveryMutex) {
976 inboundMessages.add(message);
977 if (messageListener == null) {
978 for (Iterator i = this.consumers.iterator(); i.hasNext();) {
979 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
980 if (message.isConsumerTarget(consumer.getConsumerNumber())) {
981 try {
982 consumer.processMessage(message.shallowCopy());
983 }
984 catch (JMSException e) {
985 connection.handleAsyncException(e);
986 }
987 }
988 }
989 }
990 }
991 }
992
993 /***
994 * Acknowledges all consumed messages of the session of this consumed message.
995 * <P>
996 * All consumed JMS messages support the <CODE>acknowledge</CODE> method for use when a client has specified that
997 * its JMS session's consumed messages are to be explicitly acknowledged. By invoking <CODE>acknowledge</CODE> on
998 * a consumed message, a client acknowledges all messages consumed by the session that the message was delivered
999 * to.
1000 * <P>
1001 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted sessions and sessions specified to use
1002 * implicit acknowledgement modes.
1003 * <P>
1004 * A client may individually acknowledge each message as it is consumed, or it may choose to acknowledge messages
1005 * as an application-defined group (which is done by calling acknowledge on the last received message of the group,
1006 * thereby acknowledging all messages consumed by the session.)
1007 * <P>
1008 * Messages that have been received but not acknowledged may be redelivered.
1009 *
1010 * @throws JMSException if the JMS provider fails to acknowledge the messages due to some internal error.
1011 * @throws javax.jms.IllegalStateException
1012 * if this method is called on a closed session.
1013 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1014 */
1015 public void acknowledge() throws JMSException {
1016 checkClosed();
1017 if (this.acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) {
1018 for (Iterator i = deliveredMessages.iterator(); i.hasNext();) {
1019 ActiveMQMessage msg = (ActiveMQMessage) i.next();
1020 MessageAck ack = new MessageAck();
1021 ack.setConsumerId(msg.getConsumerId());
1022 ack.setMessageID(msg.getJMSMessageID());
1023 ack.setMessageRead(msg.isMessageConsumed());
1024 ack.setId(packetIdGenerator.generateId());
1025 this.connection.asyncSendPacket(ack);
1026 this.inboundMessages.remove(msg);
1027 }
1028 }
1029 deliveredMessages.clear();
1030 }
1031
1032 protected void messageDelivered(boolean sendAcknowledge, ActiveMQMessage message, boolean messageConsumed, boolean doRemove) {
1033 if (message != null && !closed.get()) {
1034 if (this.acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) {
1035 message.setMessageConsumed(messageConsumed);
1036 deliveredMessages.add(message);
1037 }
1038 else {
1039
1040 if (sendAcknowledge) {
1041 try {
1042 doStartTransaction();
1043 MessageAck ack = new MessageAck();
1044 ack.setConsumerId(message.getConsumerId());
1045 ack.setTransactionId(this.currentTransactionId);
1046 ack.setMessageID(message.getJMSMessageID());
1047 ack.setMessageRead(messageConsumed);
1048 ack.setId(packetIdGenerator.generateId());
1049 ack.setXaTransacted(isXaTransacted());
1050
1051 this.connection.asyncSendPacket(ack);
1052 }
1053 catch (JMSException e) {
1054 log.warn("failed to notify Broker that message is delivered", e);
1055 }
1056 }
1057 if (doRemove) {
1058 inboundMessages.remove(message);
1059 }
1060 }
1061 }
1062 }
1063
1064 /***
1065 * @param consumer
1066 * @throws JMSException
1067 */
1068 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1069
1070 if (consumer.isDurableSubscriber()) {
1071 stats.onCreateDurableSubscriber();
1072 }
1073 consumer.setConsumerId(consumerIdGenerator.generateId());
1074 ConsumerInfo info = createConsumerInfo(consumer);
1075 info.setStarted(true);
1076 this.connection.syncSendPacket(info);
1077 this.consumers.add(consumer);
1078 }
1079
1080 /***
1081 * @param consumer
1082 * @throws JMSException
1083 */
1084 protected void removeConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1085 this.consumers.remove(consumer);
1086
1087
1088 if (consumer.isDurableSubscriber()) {
1089 stats.onRemoveDurableSubscriber();
1090 }
1091 if (!closed.get()) {
1092 ConsumerInfo info = createConsumerInfo(consumer);
1093 info.setStarted(false);
1094 this.connection.asyncSendPacket(info);
1095 }
1096 }
1097
1098 protected ConsumerInfo createConsumerInfo(ActiveMQMessageConsumer consumer) throws JMSException {
1099 ConsumerInfo info = new ConsumerInfo();
1100 info.setConsumerId(consumer.consumerId);
1101 info.setClientId(connection.clientID);
1102 info.setSessionId(this.sessionId);
1103 info.setConsumerNo(consumer.consumerNumber);
1104 info.setPrefetchNumber(consumer.prefetchNumber);
1105 info.setDestination(consumer.destination);
1106 info.setId(this.packetIdGenerator.generateId());
1107 info.setNoLocal(consumer.noLocal);
1108 info.setBrowser(consumer.browser);
1109 info.setSelector(consumer.messageSelector);
1110 info.setStartTime(consumer.startTime);
1111 info.setConsumerName(consumer.consumerName);
1112 return info;
1113 }
1114
1115 /***
1116 * @param producer
1117 * @throws JMSException
1118 */
1119 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1120 producer.setProducerId(producerIdGenerator.generateId());
1121 ProducerInfo info = createProducerInfo(producer);
1122 info.setStarted(true);
1123 this.connection.syncSendPacket(info);
1124 this.producers.add(producer);
1125 }
1126
1127 /***
1128 * @param producer
1129 * @throws JMSException
1130 */
1131 protected void removeProducer(ActiveMQMessageProducer producer) throws JMSException {
1132 this.producers.remove(producer);
1133 if (!closed.get()) {
1134 ProducerInfo info = createProducerInfo(producer);
1135 info.setStarted(false);
1136 this.connection.asyncSendPacket(info);
1137 }
1138 }
1139
1140 protected ProducerInfo createProducerInfo(ActiveMQMessageProducer producer) throws JMSException {
1141 ProducerInfo info = new ProducerInfo();
1142 info.setProducerId(producer.getProducerId());
1143 info.setClientId(connection.clientID);
1144 info.setSessionId(this.sessionId);
1145 info.setDestination(producer.defaultDestination);
1146 info.setId(this.packetIdGenerator.generateId());
1147 info.setStartTime(producer.getStartTime());
1148 return info;
1149 }
1150
1151 /***
1152 * Start this Session
1153 */
1154 protected void start() {
1155 }
1156
1157 /***
1158 * Stop this Session
1159 */
1160 protected void stop() {
1161 }
1162
1163 /***
1164 * @return Returns the sessionId.
1165 */
1166 protected String getSessionId() {
1167 return sessionId;
1168 }
1169
1170 /***
1171 * @param sessionId The sessionId to set.
1172 */
1173 protected void setSessionId(String sessionId) {
1174 this.sessionId = sessionId;
1175 }
1176
1177 /***
1178 * @return Returns the startTime.
1179 */
1180 protected long getStartTime() {
1181 return startTime;
1182 }
1183
1184 /***
1185 * @param startTime The startTime to set.
1186 */
1187 protected void setStartTime(long startTime) {
1188 this.startTime = startTime;
1189 }
1190
1191 /***
1192 * send the message for dispatch by the broker
1193 *
1194 * @param producer
1195 * @param destination
1196 * @param message
1197 * @param deliveryMode
1198 * @param priority
1199 * @param timeToLive
1200 * @throws JMSException
1201 */
1202 protected void send(ActiveMQMessageProducer producer, Destination destination, Message message, int deliveryMode,
1203 int priority, long timeToLive) throws JMSException {
1204 checkClosed();
1205
1206 doStartTransaction();
1207 message.setJMSDestination(destination);
1208 message.setJMSDeliveryMode(deliveryMode);
1209 message.setJMSPriority(priority);
1210 long expiration = 0L;
1211 if (!producer.getDisableMessageTimestamp()) {
1212 long timeStamp = System.currentTimeMillis();
1213 message.setJMSTimestamp(timeStamp);
1214 if (timeToLive > 0) {
1215 expiration = timeToLive + timeStamp;
1216 }
1217 }
1218 message.setJMSExpiration(expiration);
1219 if (!producer.getDisableMessageID()) {
1220 message.setJMSMessageID(producer.getIdGenerator().generateId());
1221 }
1222
1223 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message);
1224 msg.setProducerID(producer.getProducerId());
1225 msg.setTransactionId(currentTransactionId);
1226 msg.setXaTransacted(isXaTransacted());
1227 msg.setJMSClientID(this.connection.clientID);
1228 this.connection.asyncSendPacket(msg);
1229 }
1230
1231 /***
1232 * Send TransactionInfo to indicate transaction has started
1233 *
1234 * @throws JMSException if some internal error occurs
1235 */
1236 protected void doStartTransaction() throws JMSException {
1237 if (getTransacted()) {
1238 if (startTransaction.commit(false, true)) {
1239 TransactionInfo info = new TransactionInfo();
1240 info.setId(this.packetIdGenerator.generateId());
1241 info.setTransactionId(currentTransactionId);
1242 info.setType(TransactionInfo.START);
1243 this.connection.asyncSendPacket(info);
1244
1245 if (localTransactionEventListener != null) {
1246 localTransactionEventListener.beginEvent();
1247 }
1248 }
1249 }
1250 }
1251
1252 /***
1253 * @return Returns the localTransactionEventListener.
1254 */
1255 public LocalTransactionEventListener getLocalTransactionEventListener() {
1256 return localTransactionEventListener;
1257 }
1258
1259 /***
1260 * Used by the resource adapter to listen to transaction events.
1261 *
1262 * @param localTransactionEventListener The localTransactionEventListener to set.
1263 */
1264 public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
1265 this.localTransactionEventListener = localTransactionEventListener;
1266 }
1267
1268 protected boolean isXaTransacted() {
1269 return false;
1270 }
1271
1272
1273
1274 protected String getNextTransactionId() {
1275 return this.transactionIdGenerator.generateId();
1276 }
1277
1278 }