001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq;
020    
021    import java.util.Iterator;
022    import java.util.Map;
023    
024    import javax.jms.Connection;
025    import javax.jms.ConnectionConsumer;
026    import javax.jms.ConnectionMetaData;
027    import javax.jms.DeliveryMode;
028    import javax.jms.Destination;
029    import javax.jms.ExceptionListener;
030    import javax.jms.IllegalStateException;
031    import javax.jms.JMSException;
032    import javax.jms.Queue;
033    import javax.jms.QueueConnection;
034    import javax.jms.QueueSession;
035    import javax.jms.ServerSessionPool;
036    import javax.jms.Session;
037    import javax.jms.Topic;
038    import javax.jms.TopicConnection;
039    import javax.jms.TopicSession;
040    import javax.jms.XAConnection;
041    
042    import org.activemq.advisories.TempDestinationAdvisor;
043    import org.activemq.advisories.TempDestinationAdvisoryEvent;
044    import org.activemq.capacity.CapacityMonitorEvent;
045    import org.activemq.capacity.CapacityMonitorEventListener;
046    import org.activemq.filter.AndFilter;
047    import org.activemq.filter.Filter;
048    import org.activemq.filter.FilterFactory;
049    import org.activemq.filter.FilterFactoryImpl;
050    import org.activemq.filter.NoLocalFilter;
051    import org.activemq.io.util.ByteArrayCompression;
052    import org.activemq.io.util.ByteArrayFragmentation;
053    import org.activemq.io.util.MemoryBoundedObjectManager;
054    import org.activemq.io.util.MemoryBoundedQueue;
055    import org.activemq.io.util.MemoryBoundedQueueManager;
056    import org.activemq.management.JMSConnectionStatsImpl;
057    import org.activemq.management.JMSStatsImpl;
058    import org.activemq.management.StatsCapable;
059    import org.activemq.management.StatsImpl;
060    import org.activemq.message.ActiveMQDestination;
061    import org.activemq.message.ActiveMQMessage;
062    import org.activemq.message.ActiveMQObjectMessage;
063    import org.activemq.message.BrokerAdminCommand;
064    import org.activemq.message.CapacityInfo;
065    import org.activemq.message.CleanupConnectionInfo;
066    import org.activemq.message.ConnectionInfo;
067    import org.activemq.message.ConsumerInfo;
068    import org.activemq.message.Packet;
069    import org.activemq.message.PacketListener;
070    import org.activemq.message.ProducerInfo;
071    import org.activemq.message.Receipt;
072    import org.activemq.message.ResponseReceipt;
073    import org.activemq.message.SessionInfo;
074    import org.activemq.message.TransactionInfo;
075    import org.activemq.message.WireFormatInfo;
076    import org.activemq.message.XATransactionInfo;
077    import org.activemq.transport.TransportChannel;
078    import org.activemq.transport.TransportStatusEvent;
079    import org.activemq.transport.TransportStatusEventListener;
080    import org.activemq.util.IdGenerator;
081    import org.activemq.util.JMSExceptionHelper;
082    import org.apache.commons.logging.Log;
083    import org.apache.commons.logging.LogFactory;
084    
085    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
086    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
087    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
088    import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
089    
090    /**
091     * A <CODE>Connection</CODE> object is a client's active connection to its JMS
092     * provider. It typically allocates provider resources outside the Java virtual
093     * machine (JVM).
094     * <P>
095     * Connections support concurrent use.
096     * <P>
097     * A connection serves several purposes:
098     * <UL>
099     * <LI>It encapsulates an open connection with a JMS provider. It typically
100     * represents an open TCP/IP socket between a client and the service provider
101     * software.
102     * <LI>Its creation is where client authentication takes place.
103     * <LI>It can specify a unique client identifier.
104     * <LI>It provides a <CODE>ConnectionMetaData</CODE> object.
105     * <LI>It supports an optional <CODE>ExceptionListener</CODE> object.
106     * </UL>
107     * <P>
108     * Because the creation of a connection involves setting up authentication and
109     * communication, a connection is a relatively heavyweight object. Most clients
110     * will do all their messaging with a single connection. Other more advanced
111     * applications may use several connections. The JMS API does not architect a
112     * reason for using multiple connections; however, there may be operational
113     * reasons for doing so.
114     * <P>
115     * A JMS client typically creates a connection, one or more sessions, and a
116     * number of message producers and consumers. When a connection is created, it
117     * is in stopped mode. That means that no messages are being delivered.
118     * <P>
119     * It is typical to leave the connection in stopped mode until setup is complete
120     * (that is, until all message consumers have been created). At that point, the
121     * client calls the connection's <CODE>start</CODE> method, and messages begin
122     * arriving at the connection's consumers. This setup convention minimizes any
123     * client confusion that may result from asynchronous message delivery while the
124     * client is still in the process of setting itself up.
125     * <P>
126     * A connection can be started immediately, and the setup can be done
127     * afterwards. Clients that do this must be prepared to handle asynchronous
128     * message delivery while they are still in the process of setting up.
129     * <P>
130     * A message producer can send messages while a connection is stopped. <p/>This
131     * class is also a <CODE>TopicConnection </CODE>. A <CODE>TopicConnection</CODE>
132     * object is an active connection to a publish/subscribe JMS provider. A client
133     * uses a <CODE>TopicConnection</CODE> object to create one or more <CODE>TopicSession</CODE>
134     * objects for producing and consuming messages.
135     * <P>
136     * A <CODE>TopicConnection</CODE> can be used to create a <CODE>TopicSession</CODE>,
137     * from which specialized topic-related objects can be created. A more general,
138     * and recommended approach is to use the <CODE>Connection </CODE> object.
139     * <P>
140     * <p/><p/>This class is also a <CODE>QueueConnection</CODE>. A A <CODE>QueueConnection</CODE>
141     * object is an active connection to a point-to-point JMS provider. A client
142     * uses a <CODE>QueueConnection</CODE> object to create one or more <CODE>QueueSession</CODE>
143     * objects for producing and consuming messages.
144     * <P>
145     * A <CODE>QueueConnection</CODE> can be used to create a <CODE>QueueSession</CODE>,
146     * from which specialized queue-related objects can be created. A more general,
147     * and recommended, approach is to use the <CODE>Connection </CODE> object.
148     * <P>
149     * A <CODE>QueueConnection</CODE> cannot be used to create objects specific to
150     * the publish/subscribe domain. The <CODE>createDurableConnectionConsumer</CODE>
151     * method inherits from <CODE>Connection</CODE>, but must throw an <CODE>IllegalStateException</CODE>
152     * if used from <CODE>QueueConnection</CODE>. // *
153     * 
154     * @version $Revision: 1.1.1.1 $
155     * @see javax.jms.Connection
156     * @see javax.jms.ConnectionFactory
157     * @see javax.jms.QueueConnection
158     * @see javax.jms.TopicConnection
159     * @see javax.jms.TopicConnectionFactory
160     * @see javax.jms.QueueConnection
161     * @see javax.jms.QueueConnectionFactory
162     */
163    public class ActiveMQConnection implements Connection, PacketListener,
164                    ExceptionListener, TopicConnection, QueueConnection, StatsCapable,
165                    CapacityMonitorEventListener, TransportStatusEventListener, Closeable {
166    
167            /**
168             * Default UserName for the Connection
169             */
170            public static final String DEFAULT_USER = "defaultUser";
171    
172            /**
173             * Default URL for the ActiveMQ Broker
174             */
175            public static final String DEFAULT_BROKER_URL = "tcp://localhost:61616";
176    
177            /**
178             * Default client URL. If using a message broker in a hub(s)/spoke
179             * architecture - use the DEFAULT_BROKER_URL
180             * 
181             * @see ActiveMQConnection#DEFAULT_BROKER_URL
182             */
183            public static final String DEFAULT_URL = "peer://development";
184    
185            /**
186             * Default Password for the Connection
187             */
188            public static final String DEFAULT_PASSWORD = "defaultPassword";
189    
190            private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
191    
192            private static final int DEFAULT_CONNECTION_MEMORY_LIMIT = 10 * 1024 * 1024;
193    
194            // properties
195            private ActiveMQConnectionFactory factory;
196    
197            private String userName;
198    
199            private String password;
200    
201            protected String clientID;
202    
203            private int sendCloseTimeout = 2000;
204    
205            private TransportChannel transportChannel;
206    
207            private ExceptionListener exceptionListener;
208    
209            private ActiveMQPrefetchPolicy prefetchPolicy;
210    
211            private JMSStatsImpl factoryStats;
212    
213            private MemoryBoundedObjectManager memoryManager;
214    
215            private MemoryBoundedQueueManager boundedQueueManager;
216    
217            protected IdGenerator handleIdGenerator;
218    
219            private IdGenerator clientIdGenerator;
220    
221            protected IdGenerator packetIdGenerator;
222    
223            private IdGenerator sessionIdGenerator;
224    
225            private JMSConnectionStatsImpl stats;
226    
227            // internal state
228            private CopyOnWriteArrayList sessions;
229    
230            private CopyOnWriteArrayList messageDispatchers;
231    
232            private CopyOnWriteArrayList connectionConsumers;
233    
234            private SynchronizedInt consumerNumberGenerator;
235    
236            private ActiveMQConnectionMetaData connectionMetaData;
237    
238            private boolean closed;
239    
240            private SynchronizedBoolean started;
241    
242            private boolean clientIDSet;
243    
244            private boolean isConnectionInfoSentToBroker;
245    
246            private boolean isTransportOK;
247    
248            private boolean startedTransport;
249    
250            private long startTime;
251    
252            private long flowControlSleepTime = 0;
253    
254            private boolean quickClose;
255    
256            private boolean internalConnection;// used for notifying that the
257                                                                                    // connection is used for networks etc.
258    
259            private boolean userSpecifiedClientID;
260    
261            /**
262             * Should we use an async send for persistent non transacted messages ?
263             */
264            protected boolean useAsyncSend = true;
265    
266            private int sendConnectionInfoTimeout = 30000;
267    
268            private boolean disableTimeStampsByDefault = false;
269    
270            private boolean J2EEcompliant = true;
271    
272            private boolean prepareMessageBodyOnSend = true;
273    
274            private boolean copyMessageOnSend = true;
275    
276            // compression and fragmentation variables
277    
278            private boolean doMessageCompression = true;
279    
280            private int messageCompressionLimit = ByteArrayCompression.DEFAULT_COMPRESSION_LIMIT;// data
281                                                                                                                                                                                            // size
282                                                                                                                                                                                            // above
283                                                                                                                                                                                            // which
284                                                                                                                                                                                            // compression
285                                                                                                                                                                                            // will
286                                                                                                                                                                                            // be
287                                                                                                                                                                                            // used
288    
289            private int messageCompressionLevel = ByteArrayCompression.DEFAULT_COMPRESSION_LEVEL;
290    
291            private int messageCompressionStrategy = ByteArrayCompression.DEFAULT_COMPRESSION_STRATEGY;// default
292                                                                                                                                                                                                    // compression
293                                                                                                                                                                                                    // strategy
294    
295            private boolean doMessageFragmentation = true;
296    
297            private int messageFragmentationLimit = ByteArrayFragmentation.DEFAULT_FRAGMENTATION_LIMIT;
298    
299            private boolean cachingEnabled = true;
300    
301            private boolean optimizedMessageDispatch = false;
302    
303            private CopyOnWriteArrayList transientConsumedRedeliverCache;
304    
305            private FilterFactory filterFactory;
306    
307            private Map tempDestinationMap;
308    
309            private Map validDestinationsMap;
310    
311            private String resourceManagerId;
312    
313            /**
314             * A static helper method to create a new connection
315             * 
316             * @return an ActiveMQConnection
317             * @throws JMSException
318             */
319            public static ActiveMQConnection makeConnection() throws JMSException {
320                    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
321                    return (ActiveMQConnection) factory.createConnection();
322            }
323    
324            /**
325             * A static helper method to create a new connection
326             * 
327             * @param uri
328             * @return and ActiveMQConnection
329             * @throws JMSException
330             */
331            public static ActiveMQConnection makeConnection(String uri)
332                            throws JMSException {
333                    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
334                    return (ActiveMQConnection) factory.createConnection();
335            }
336    
337            /**
338             * A static helper method to create a new connection
339             * 
340             * @param user
341             * @param password
342             * @param uri
343             * @return an ActiveMQConnection
344             * @throws JMSException
345             */
346            public static ActiveMQConnection makeConnection(String user,
347                            String password, String uri) throws JMSException {
348                    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user,
349                                    password, uri);
350                    return (ActiveMQConnection) factory.createConnection();
351            }
352    
353            /**
354             * Constructs a connection from an existing TransportChannel and
355             * user/password.
356             * 
357             * @param factory
358             * @param theUserName
359             *            the users name
360             * @param thePassword
361             *            the password
362             * @param transportChannel
363             *            the transport channel to communicate with the server
364             * @throws JMSException
365             */
366            public ActiveMQConnection(ActiveMQConnectionFactory factory,
367                            String theUserName, String thePassword,
368                            TransportChannel transportChannel) throws JMSException {
369                    this(factory, theUserName, thePassword);
370                    this.transportChannel = transportChannel;
371                    this.transportChannel.setPacketListener(this);
372                    this.transportChannel.setExceptionListener(this);
373                    this.transportChannel.addTransportStatusEventListener(this);
374                    this.isTransportOK = true;
375            }
376    
377            protected ActiveMQConnection(ActiveMQConnectionFactory factory,
378                            String theUserName, String thePassword) {
379                    this.factory = factory;
380                    this.userName = theUserName;
381                    this.password = thePassword;
382                    this.clientIdGenerator = new IdGenerator();
383                    this.packetIdGenerator = new IdGenerator();
384                    this.handleIdGenerator = new IdGenerator();
385                    this.sessionIdGenerator = new IdGenerator();
386                    this.consumerNumberGenerator = new SynchronizedInt(0);
387                    this.sessions = new CopyOnWriteArrayList();
388                    this.messageDispatchers = new CopyOnWriteArrayList();
389                    this.connectionConsumers = new CopyOnWriteArrayList();
390                    this.connectionMetaData = new ActiveMQConnectionMetaData();
391                    this.started = new SynchronizedBoolean(false);
392                    this.startTime = System.currentTimeMillis();
393                    this.prefetchPolicy = new ActiveMQPrefetchPolicy();
394                    this.memoryManager = new MemoryBoundedObjectManager(clientID,
395                                    DEFAULT_CONNECTION_MEMORY_LIMIT);
396                    this.boundedQueueManager = new MemoryBoundedQueueManager(memoryManager);
397                    this.memoryManager.addCapacityEventListener(this);
398                    boolean transactional = this instanceof XAConnection;
399                    factoryStats = factory.getFactoryStats();
400                    factoryStats.addConnection(this);
401                    stats = new JMSConnectionStatsImpl(sessions, transactional);
402                    this.transientConsumedRedeliverCache = new CopyOnWriteArrayList();
403                    this.tempDestinationMap = new ConcurrentHashMap();
404                    this.validDestinationsMap = new ConcurrentHashMap();
405                    factory.onConnectionCreate(this);
406            }
407    
408            /**
409             * @return statistics for this Connection
410             */
411            public StatsImpl getStats() {
412                    return stats;
413            }
414    
415            /**
416             * @return a number unique for this connection
417             */
418            public JMSConnectionStatsImpl getConnectionStats() {
419                    return stats;
420            }
421    
422            /**
423             * Creates a <CODE>Session</CODE> object.
424             * 
425             * @param transacted
426             *            indicates whether the session is transacted
427             * @param acknowledgeMode
428             *            indicates whether the consumer or the client will acknowledge
429             *            any messages it receives; ignored if the session is
430             *            transacted. Legal values are
431             *            <code>Session.AUTO_ACKNOWLEDGE</code>,
432             *            <code>Session.CLIENT_ACKNOWLEDGE</code>, and
433             *            <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
434             * @return a newly created session
435             * @throws JMSException
436             *             if the <CODE>Connection</CODE> object fails to create a
437             *             session due to some internal error or lack of support for the
438             *             specific transaction and acknowledgement mode.
439             * @see Session#AUTO_ACKNOWLEDGE
440             * @see Session#CLIENT_ACKNOWLEDGE
441             * @see Session#DUPS_OK_ACKNOWLEDGE
442             * @since 1.1
443             */
444            public Session createSession(boolean transacted, int acknowledgeMode)
445                            throws JMSException {
446                    checkClosed();
447                    sendConnectionInfoToBroker();
448                    return new ActiveMQSession(
449                                    this,
450                                    (transacted ? Session.SESSION_TRANSACTED
451                                                    : (acknowledgeMode == Session.SESSION_TRANSACTED ? Session.AUTO_ACKNOWLEDGE
452                                                                    : acknowledgeMode)));
453            }
454    
455            /**
456             * Creates a <CODE>Session</CODE> object.
457             * 
458             * @param transacted
459             *            indicates whether the session is transacted
460             * @param acknowledgeMode
461             *            indicates whether the consumer or the client will acknowledge
462             *            any messages it receives; ignored if the session is
463             *            transacted. Legal values are
464             *            <code>Session.AUTO_ACKNOWLEDGE</code>,
465             *            <code>Session.CLIENT_ACKNOWLEDGE</code>, and
466             *            <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
467             * @param optimizedDispatch
468             * @return a newly created session
469             * @throws JMSException
470             *             if the <CODE>Connection</CODE> object fails to create a
471             *             session due to some internal error or lack of support for the
472             *             specific transaction and acknowledgement mode.
473             * @see Session#AUTO_ACKNOWLEDGE
474             * @see Session#CLIENT_ACKNOWLEDGE
475             * @see Session#DUPS_OK_ACKNOWLEDGE
476             * @since 1.1
477             */
478            public Session createSession(boolean transacted, int acknowledgeMode,
479                            boolean optimizedDispatch) throws JMSException {
480                    checkClosed();
481                    sendConnectionInfoToBroker();
482                    return new ActiveMQSession(this,
483                                    (transacted ? Session.SESSION_TRANSACTED : acknowledgeMode),
484                                    optimizedDispatch);
485            }
486    
487            /**
488             * Gets the client identifier for this connection.
489             * <P>
490             * This value is specific to the JMS provider. It is either preconfigured by
491             * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
492             * dynamically by the application by calling the <code>setClientID</code>
493             * method.
494             * 
495             * @return the unique client identifier
496             * @throws JMSException
497             *             if the JMS provider fails to return the client ID for this
498             *             connection due to some internal error.
499             */
500            public String getClientID() throws JMSException {
501                    checkClosed();
502                    return this.clientID;
503            }
504    
505            /**
506             * Sets the client identifier for this connection.
507             * <P>
508             * The preferred way to assign a JMS client's client identifier is for it to
509             * be configured in a client-specific <CODE>ConnectionFactory</CODE>
510             * object and transparently assigned to the <CODE>Connection</CODE> object
511             * it creates.
512             * <P>
513             * Alternatively, a client can set a connection's client identifier using a
514             * provider-specific value. The facility to set a connection's client
515             * identifier explicitly is not a mechanism for overriding the identifier
516             * that has been administratively configured. It is provided for the case
517             * where no administratively specified identifier exists. If one does exist,
518             * an attempt to change it by setting it must throw an <CODE>IllegalStateException</CODE>.
519             * If a client sets the client identifier explicitly, it must do so
520             * immediately after it creates the connection and before any other action
521             * on the connection is taken. After this point, setting the client
522             * identifier is a programming error that should throw an <CODE>IllegalStateException</CODE>.
523             * <P>
524             * The purpose of the client identifier is to associate a connection and its
525             * objects with a state maintained on behalf of the client by a provider.
526             * The only such state identified by the JMS API is that required to support
527             * durable subscriptions.
528             * <P>
529             * If another connection with the same <code>clientID</code> is already
530             * running when this method is called, the JMS provider should detect the
531             * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
532             * 
533             * @param newClientID
534             *            the unique client identifier
535             * @throws JMSException
536             *             if the JMS provider fails to set the client ID for this
537             *             connection due to some internal error.
538             * @throws javax.jms.InvalidClientIDException
539             *             if the JMS client specifies an invalid or duplicate client
540             *             ID.
541             * @throws javax.jms.IllegalStateException
542             *             if the JMS client attempts to set a connection's client ID at
543             *             the wrong time or when it has been administratively
544             *             configured.
545             */
546            public void setClientID(String newClientID) throws JMSException {
547                    if (this.clientIDSet) {
548                            throw new IllegalStateException("The clientID has already been set");
549                    }
550                    if (this.isConnectionInfoSentToBroker) {
551                            throw new IllegalStateException(
552                                            "Setting clientID on a used Connection is not allowed");
553                    }
554                    checkClosed();
555                    this.clientID = newClientID;
556                    this.userSpecifiedClientID = true;
557                    ensureClientIDInitialised();
558            }
559    
560            /**
561             * Gets the metadata for this connection.
562             * 
563             * @return the connection metadata
564             * @throws JMSException
565             *             if the JMS provider fails to get the connection metadata for
566             *             this connection.
567             * @see javax.jms.ConnectionMetaData
568             */
569            public ConnectionMetaData getMetaData() throws JMSException {
570                    checkClosed();
571                    return this.connectionMetaData;
572            }
573    
574            /**
575             * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
576             * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
577             * associated with it.
578             * 
579             * @return the <CODE>ExceptionListener</CODE> for this connection, or
580             *         null. if no <CODE>ExceptionListener</CODE> is associated with
581             *         this connection.
582             * @throws JMSException
583             *             if the JMS provider fails to get the <CODE>ExceptionListener</CODE>
584             *             for this connection.
585             * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
586             */
587            public ExceptionListener getExceptionListener() throws JMSException {
588                    checkClosed();
589                    return this.exceptionListener;
590            }
591    
592            /**
593             * Sets an exception listener for this connection.
594             * <P>
595             * If a JMS provider detects a serious problem with a connection, it informs
596             * the connection's <CODE> ExceptionListener</CODE>, if one has been
597             * registered. It does this by calling the listener's <CODE>onException
598             * </CODE> method, passing it a <CODE>JMSException</CODE> object
599             * describing the problem.
600             * <P>
601             * An exception listener allows a client to be notified of a problem
602             * asynchronously. Some connections only consume messages, so they would
603             * have no other way to learn their connection has failed.
604             * <P>
605             * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
606             * <P>
607             * A JMS provider should attempt to resolve connection problems itself
608             * before it notifies the client of them.
609             * 
610             * @param listener
611             *            the exception listener
612             * @throws JMSException
613             *             if the JMS provider fails to set the exception listener for
614             *             this connection.
615             */
616            public void setExceptionListener(ExceptionListener listener)
617                            throws JMSException {
618                    checkClosed();
619                    this.exceptionListener = listener;
620                    this.transportChannel.setExceptionListener(listener);
621            }
622    
623            /**
624             * Starts (or restarts) a connection's delivery of incoming messages. A call
625             * to <CODE>start</CODE> on a connection that has already been started is
626             * ignored.
627             * 
628             * @throws JMSException
629             *             if the JMS provider fails to start message delivery due to
630             *             some internal error.
631             * @see javax.jms.Connection#stop()
632             */
633            public void start() throws JMSException {
634                    checkClosed();
635                    if (started.commit(false, true)) {
636                            // We have a change in connection info to send.
637                            // send the Connection info again
638                            sendConnectionInfoToBroker(sendConnectionInfoTimeout, true, false);
639                            for (Iterator i = sessions.iterator(); i.hasNext();) {
640                                    ActiveMQSession s = (ActiveMQSession) i.next();
641                                    s.start();
642                            }
643                    }
644            }
645    
646            /**
647             * @return true if this Connection is started
648             */
649            protected boolean isStarted() {
650                    return started.get();
651            }
652    
653            /**
654             * Temporarily stops a connection's delivery of incoming messages. Delivery
655             * can be restarted using the connection's <CODE>start</CODE> method. When
656             * the connection is stopped, delivery to all the connection's message
657             * consumers is inhibited: synchronous receives block, and messages are not
658             * delivered to message listeners.
659             * <P>
660             * This call blocks until receives and/or message listeners in progress have
661             * completed.
662             * <P>
663             * Stopping a connection has no effect on its ability to send messages. A
664             * call to <CODE>stop</CODE> on a connection that has already been stopped
665             * is ignored.
666             * <P>
667             * A call to <CODE>stop</CODE> must not return until delivery of messages
668             * has paused. This means that a client can rely on the fact that none of
669             * its message listeners will be called and that all threads of control
670             * waiting for <CODE>receive</CODE> calls to return will not return with a
671             * message until the connection is restarted. The receive timers for a
672             * stopped connection continue to advance, so receives may time out while
673             * the connection is stopped.
674             * <P>
675             * If message listeners are running when <CODE>stop</CODE> is invoked, the
676             * <CODE>stop</CODE> call must wait until all of them have returned before
677             * it may return. While these message listeners are completing, they must
678             * have the full services of the connection available to them.
679             * 
680             * @throws JMSException
681             *             if the JMS provider fails to stop message delivery due to
682             *             some internal error.
683             * @see javax.jms.Connection#start()
684             */
685            public void stop() throws JMSException {
686                    checkClosed();
687                    if (started.commit(true, false)) {
688                            for (Iterator i = sessions.iterator(); i.hasNext();) {
689                                    ActiveMQSession s = (ActiveMQSession) i.next();
690                                    s.stop();
691                            }
692                            sendConnectionInfoToBroker(2000, true, false);
693                    }
694            }
695    
696            /**
697             * Closes the connection.
698             * <P>
699             * Since a provider typically allocates significant resources outside the
700             * JVM on behalf of a connection, clients should close these resources when
701             * they are not needed. Relying on garbage collection to eventually reclaim
702             * these resources may not be timely enough.
703             * <P>
704             * There is no need to close the sessions, producers, and consumers of a
705             * closed connection.
706             * <P>
707             * Closing a connection causes all temporary destinations to be deleted.
708             * <P>
709             * When this method is invoked, it should not return until message
710             * processing has been shut down in an orderly fashion. This means that all
711             * message listeners that may have been running have returned, and that all
712             * pending receives have returned. A close terminates all pending message
713             * receives on the connection's sessions' consumers. The receives may return
714             * with a message or with null, depending on whether there was a message
715             * available at the time of the close. If one or more of the connection's
716             * sessions' message listeners is processing a message at the time when
717             * connection <CODE>close</CODE> is invoked, all the facilities of the
718             * connection and its sessions must remain available to those listeners
719             * until they return control to the JMS provider.
720             * <P>
721             * Closing a connection causes any of its sessions' transactions in progress
722             * to be rolled back. In the case where a session's work is coordinated by
723             * an external transaction manager, a session's <CODE>commit</CODE> and
724             * <CODE> rollback</CODE> methods are not used and the result of a closed
725             * session's work is determined later by the transaction manager. Closing a
726             * connection does NOT force an acknowledgment of client-acknowledged
727             * sessions.
728             * <P>
729             * Invoking the <CODE>acknowledge</CODE> method of a received message from
730             * a closed connection's session must throw an <CODE>IllegalStateException</CODE>.
731             * Closing a closed connection must NOT throw an exception.
732             * 
733             * @throws JMSException
734             *             if the JMS provider fails to close the connection due to some
735             *             internal error. For example, a failure to release resources
736             *             or to close a socket connection can cause this exception to
737             *             be thrown.
738             */
739            public void close() throws JMSException {
740                    this.transportChannel.setPendingStop(true);
741                    synchronized (this) {
742                            if (!closed) {
743                                    memoryManager.removeCapacityEventListener(this);
744                                    try {
745                                            closeTemporaryDestinations();
746                                            for (Iterator i = this.sessions.iterator(); i.hasNext();) {
747                                                    ActiveMQSession s = (ActiveMQSession) i.next();
748                                                    s.close();
749                                            }
750                                            for (Iterator i = this.connectionConsumers.iterator(); i
751                                                            .hasNext();) {
752                                                    ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i
753                                                                    .next();
754                                                    c.close();
755                                            }
756                                            try {
757                                                    sendConnectionInfoToBroker(sendCloseTimeout, true, true);
758                                            } catch (TimeoutExpiredException e) {
759                                                    log
760                                                                    .warn("Failed to send close to broker, timeout expired of: "
761                                                                                    + sendCloseTimeout + " millis");
762                                            }
763                                            this.connectionConsumers.clear();
764                                            this.messageDispatchers.clear();
765                                            this.transportChannel.stop();
766                                    } finally {
767                                            this.sessions.clear();
768                                            started.set(false);
769                                            factory.onConnectionClose(this);
770                                    }
771                                    closed = true;
772                                    transientConsumedRedeliverCache.clear();
773                                    validDestinationsMap.clear();
774                     factoryStats.removeConnection(this);
775                            }
776                    }
777    
778            }
779    
780        /**
781         * Tells the broker to terminate its VM.  This can be used to cleanly terminate a broker running in
782         * a standalone java process.  Server must have property enable.vm.shutdown=true defined
783         * to allow this to work.
784         */
785        public void terminateBrokerVM() throws JMSException {
786            BrokerAdminCommand command = new BrokerAdminCommand();
787            command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
788            asyncSendPacket(command);
789        }
790    
791            /**
792             * simply throws an exception if the Connection is already closed
793             * 
794             * @throws JMSException
795             */
796            protected synchronized void checkClosed() throws JMSException {
797                    if (!startedTransport) {
798                            startedTransport = true;
799                            this.transportChannel.setCachingEnabled(isCachingEnabled());
800                            if (useAsyncSend == false) {
801                                    this.transportChannel.setNoDelay(true);
802                            }
803    
804                            this.transportChannel.setUsedInternally(internalConnection);
805                            this.transportChannel.start();
806                            if (transportChannel.doesSupportWireFormatVersioning()) {
807                                    WireFormatInfo info = new WireFormatInfo();
808                                    info.setVersion(transportChannel.getCurrentWireFormatVersion());
809                                    this.asyncSendPacket(info);
810                            }
811                    }
812                    if (this.closed) {
813                            throw new ConnectionClosedException();
814                    }
815            }
816    
817            /**
818             * Creates a connection consumer for this connection (optional operation).
819             * This is an expert facility not used by regular JMS clients.
820             * 
821             * @param destination
822             *            the destination to access
823             * @param messageSelector
824             *            only messages with properties matching the message selector
825             *            expression are delivered. A value of null or an empty string
826             *            indicates that there is no message selector for the message
827             *            consumer.
828             * @param sessionPool
829             *            the server session pool to associate with this connection
830             *            consumer
831             * @param maxMessages
832             *            the maximum number of messages that can be assigned to a
833             *            server session at one time
834             * @return the connection consumer
835             * @throws JMSException
836             *             if the <CODE>Connection</CODE> object fails to create a
837             *             connection consumer due to some internal error or invalid
838             *             arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
839             * @throws javax.jms.InvalidDestinationException
840             *             if an invalid destination is specified.
841             * @throws javax.jms.InvalidSelectorException
842             *             if the message selector is invalid.
843             * @see javax.jms.ConnectionConsumer
844             * @since 1.1
845             */
846            public ConnectionConsumer createConnectionConsumer(Destination destination,
847                            String messageSelector, ServerSessionPool sessionPool,
848                            int maxMessages) throws JMSException {
849                    checkClosed();
850                    ensureClientIDInitialised();
851                    ConsumerInfo info = new ConsumerInfo();
852                    info.setConsumerId(handleIdGenerator.generateId());
853                    info.setDestination(ActiveMQMessageTransformation
854                                    .transformDestination(destination));
855                    info.setSelector(messageSelector);
856                    info.setConsumerNo(handleIdGenerator.getNextShortSequence());
857                    return new ActiveMQConnectionConsumer(this, sessionPool, info,
858                                    maxMessages);
859            }
860        
861        /**
862         * Creates a connection consumer for this connection (optional operation).
863         * This is an expert facility not used by regular JMS clients.
864         * 
865         * @param destination
866         *            the destination to access
867         * @param messageSelector
868         *            only messages with properties matching the message selector
869         *            expression are delivered. A value of null or an empty string
870         *            indicates that there is no message selector for the message
871         *            consumer.
872         * @param sessionPool
873         *            the server session pool to associate with this connection
874         *            consumer
875         * @param maxMessages
876         *            the maximum number of messages that can be assigned to a
877         *            server session at one time
878         * @param noLocal
879         *            set true if you want to filter out messages published locally
880         *           
881         * @return the connection consumer
882         * @throws JMSException
883         *             if the <CODE>Connection</CODE> object fails to create a
884         *             connection consumer due to some internal error or invalid
885         *             arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
886         * @throws javax.jms.InvalidDestinationException
887         *             if an invalid destination is specified.
888         * @throws javax.jms.InvalidSelectorException
889         *             if the message selector is invalid.
890         * @see javax.jms.ConnectionConsumer
891         * @since 1.1
892         */
893        public ConnectionConsumer createConnectionConsumer(Destination destination,
894                String messageSelector, ServerSessionPool sessionPool,
895                int maxMessages, boolean noLocal) throws JMSException {
896            
897            checkClosed();
898            ensureClientIDInitialised();
899            ConsumerInfo info = new ConsumerInfo();
900            info.setConsumerId(handleIdGenerator.generateId());
901            info.setDestination(ActiveMQMessageTransformation
902                    .transformDestination(destination));
903            info.setSelector(messageSelector);
904            info.setConsumerNo(handleIdGenerator.getNextShortSequence());
905            info.setNoLocal(noLocal);
906            return new ActiveMQConnectionConsumer(this, sessionPool, info,
907                    maxMessages);
908        }
909    
910    
911    
912            /**
913             * Create a durable connection consumer for this connection (optional
914             * operation). This is an expert facility not used by regular JMS clients.
915             * 
916             * @param topic
917             *            topic to access
918             * @param subscriptionName
919             *            durable subscription name
920             * @param messageSelector
921             *            only messages with properties matching the message selector
922             *            expression are delivered. A value of null or an empty string
923             *            indicates that there is no message selector for the message
924             *            consumer.
925             * @param sessionPool
926             *            the server session pool to associate with this durable
927             *            connection consumer
928             * @param maxMessages
929             *            the maximum number of messages that can be assigned to a
930             *            server session at one time
931             * @return the durable connection consumer
932             * @throws JMSException
933             *             if the <CODE>Connection</CODE> object fails to create a
934             *             connection consumer due to some internal error or invalid
935             *             arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
936             * @throws javax.jms.InvalidDestinationException
937             *             if an invalid destination is specified.
938             * @throws javax.jms.InvalidSelectorException
939             *             if the message selector is invalid.
940             * @see javax.jms.ConnectionConsumer
941             * @since 1.1
942             */
943            public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
944                            String subscriptionName, String messageSelector,
945                            ServerSessionPool sessionPool, int maxMessages) throws JMSException {
946                    checkClosed();
947                    ensureClientIDInitialised();
948                    ConsumerInfo info = new ConsumerInfo();
949                    info.setConsumerId(this.handleIdGenerator.generateId());
950                    info.setDestination(ActiveMQMessageTransformation
951                                    .transformDestination(topic));
952                    info.setSelector(messageSelector);
953                    info.setConsumerName(subscriptionName);
954                    info.setConsumerNo(handleIdGenerator.getNextShortSequence());
955                    return new ActiveMQConnectionConsumer(this, sessionPool, info,
956                                    maxMessages);
957            }
958        
959        /**
960         * Create a durable connection consumer for this connection (optional
961         * operation). This is an expert facility not used by regular JMS clients.
962         * 
963         * @param topic
964         *            topic to access
965         * @param subscriptionName
966         *            durable subscription name
967         * @param messageSelector
968         *            only messages with properties matching the message selector
969         *            expression are delivered. A value of null or an empty string
970         *            indicates that there is no message selector for the message
971         *            consumer.
972         * @param sessionPool
973         *            the server session pool to associate with this durable
974         *            connection consumer
975         * @param maxMessages
976         *            the maximum number of messages that can be assigned to a
977         *            server session at one time
978         * @param noLocal
979         *            set true if you want to filter out messages published locally
980         *            
981         * @return the durable connection consumer
982         * @throws JMSException
983         *             if the <CODE>Connection</CODE> object fails to create a
984         *             connection consumer due to some internal error or invalid
985         *             arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
986         * @throws javax.jms.InvalidDestinationException
987         *             if an invalid destination is specified.
988         * @throws javax.jms.InvalidSelectorException
989         *             if the message selector is invalid.
990         * @see javax.jms.ConnectionConsumer
991         * @since 1.1
992         */
993        public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
994                String subscriptionName, String messageSelector,
995                ServerSessionPool sessionPool, int maxMessages, boolean noLocal) throws JMSException {
996            checkClosed();
997            ensureClientIDInitialised();
998            ConsumerInfo info = new ConsumerInfo();
999            info.setConsumerId(this.handleIdGenerator.generateId());
1000            info.setDestination(ActiveMQMessageTransformation
1001                    .transformDestination(topic));
1002            info.setSelector(messageSelector);
1003            info.setConsumerName(subscriptionName);
1004            info.setNoLocal(noLocal);
1005            info.setConsumerNo(handleIdGenerator.getNextShortSequence());
1006            return new ActiveMQConnectionConsumer(this, sessionPool, info,
1007                    maxMessages);
1008        }
1009    
1010            /**
1011             * Implementation of the PacketListener interface - consume a packet
1012             * 
1013             * @param packet -
1014             *            the Packet to consume
1015             * @see org.activemq.message.PacketListener#consume(org.activemq.message.Packet)
1016             */
1017            public void consume(Packet packet) {
1018                    if (!closed && packet != null) {
1019                            if (packet.isJMSMessage()) {
1020                                    ActiveMQMessage message = (ActiveMQMessage) packet;
1021                                    message.setReadOnly(true);
1022                                    message.setConsumerIdentifer(clientID);
1023    
1024                                    // lets check for expired messages which is only relevant for
1025                                    // multicast based stuff
1026                                    // as a pointcast based network should filter out this stuff
1027                                    if (transportChannel.isMulticast()) {
1028                                            long expiration = message.getJMSExpiration();
1029                                            if (expiration > 0) {
1030                                                    long timeStamp = System.currentTimeMillis();
1031                                                    if (timeStamp > expiration) {
1032                                                            if (log.isDebugEnabled()) {
1033                                                                    log.debug("Discarding expired message: "
1034                                                                                    + message);
1035                                                            }
1036                                                            return;
1037                                                    }
1038                                            }
1039                                    }
1040    
1041                                    try {
1042                                            int count = 0;
1043                                            for (Iterator i = this.messageDispatchers.iterator(); i
1044                                                            .hasNext();) {
1045                                                    ActiveMQMessageDispatcher dispatcher = (ActiveMQMessageDispatcher) i
1046                                                                    .next();
1047                                                    if (dispatcher.isTarget(message)) {
1048                                                            if (count > 0) {
1049                                                                    // separate message for each Session etc.
1050                                                                    message = message.deepCopy();
1051                                                            }
1052                                                            dispatcher.dispatch(message);
1053                                                            count++;
1054                                                    }
1055                                            }
1056                                    } catch (JMSException jmsEx) {
1057                                            handleAsyncException(jmsEx);
1058                                    }
1059                            } else if (packet.getPacketType() == Packet.CAPACITY_INFO) {
1060                                    CapacityInfo info = (CapacityInfo) packet;
1061                                    flowControlSleepTime = info.getFlowControlTimeout();
1062                                    // System.out.println("SET FLOW TIMEOUT = " +
1063                                    // flowControlSleepTime + " FOR " + info);
1064                            } else if (packet.getPacketType() == Packet.KEEP_ALIVE
1065                                            && packet.isReceiptRequired()) {
1066                                    Receipt receipt = new Receipt();
1067                                    receipt.setCorrelationId(packet.getId());
1068                                    receipt.setReceiptRequired(false);
1069                                    try {
1070                                            asyncSendPacket(receipt);
1071                                    } catch (JMSException jmsEx) {
1072                                            handleAsyncException(jmsEx);
1073                                    }
1074                            }
1075                    }
1076            }
1077    
1078            /**
1079             * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
1080             */
1081            public void onException(JMSException jmsEx) {
1082                    // Got an exception propagated up from the transport channel
1083                    handleAsyncException(jmsEx);
1084                    isTransportOK = false;
1085                    try {
1086                            close();
1087                    } catch (JMSException ex) {
1088                            log.debug("Exception closing the connection", ex);
1089                    }
1090            }
1091    
1092            /**
1093             * Creates a <CODE>TopicSession</CODE> object.
1094             * 
1095             * @param transacted
1096             *            indicates whether the session is transacted
1097             * @param acknowledgeMode
1098             *            indicates whether the consumer or the client will acknowledge
1099             *            any messages it receives; ignored if the session is
1100             *            transacted. Legal values are
1101             *            <code>Session.AUTO_ACKNOWLEDGE</code>,
1102             *            <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1103             *            <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1104             * @return a newly created topic session
1105             * @throws JMSException
1106             *             if the <CODE>TopicConnection</CODE> object fails to create
1107             *             a session due to some internal error or lack of support for
1108             *             the specific transaction and acknowledgement mode.
1109             * @see Session#AUTO_ACKNOWLEDGE
1110             * @see Session#CLIENT_ACKNOWLEDGE
1111             * @see Session#DUPS_OK_ACKNOWLEDGE
1112             */
1113            public TopicSession createTopicSession(boolean transacted,
1114                            int acknowledgeMode) throws JMSException {
1115                    return new ActiveMQTopicSession((ActiveMQSession) createSession(
1116                                    transacted, acknowledgeMode));
1117            }
1118    
1119            /**
1120             * Creates a connection consumer for this connection (optional operation).
1121             * This is an expert facility not used by regular JMS clients.
1122             * 
1123             * @param topic
1124             *            the topic to access
1125             * @param messageSelector
1126             *            only messages with properties matching the message selector
1127             *            expression are delivered. A value of null or an empty string
1128             *            indicates that there is no message selector for the message
1129             *            consumer.
1130             * @param sessionPool
1131             *            the server session pool to associate with this connection
1132             *            consumer
1133             * @param maxMessages
1134             *            the maximum number of messages that can be assigned to a
1135             *            server session at one time
1136             * @return the connection consumer
1137             * @throws JMSException
1138             *             if the <CODE>TopicConnection</CODE> object fails to create
1139             *             a connection consumer due to some internal error or invalid
1140             *             arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
1141             * @throws InvalidDestinationException
1142             *             if an invalid topic is specified.
1143             * @throws InvalidSelectorException
1144             *             if the message selector is invalid.
1145             * @see javax.jms.ConnectionConsumer
1146             */
1147            public ConnectionConsumer createConnectionConsumer(Topic topic,
1148                            String messageSelector, ServerSessionPool sessionPool,
1149                            int maxMessages) throws JMSException {
1150                    checkClosed();
1151                    ensureClientIDInitialised();
1152                    ConsumerInfo info = new ConsumerInfo();
1153                    info.setConsumerId(this.handleIdGenerator.generateId());
1154                    info.setDestination(ActiveMQMessageTransformation
1155                                    .transformDestination(topic));
1156                    info.setSelector(messageSelector);
1157                    info.setConsumerNo(handleIdGenerator.getNextShortSequence());
1158                    return new ActiveMQConnectionConsumer(this, sessionPool, info,
1159                                    maxMessages);
1160            }
1161    
1162            /**
1163             * Creates a <CODE>QueueSession</CODE> object.
1164             * 
1165             * @param transacted
1166             *            indicates whether the session is transacted
1167             * @param acknowledgeMode
1168             *            indicates whether the consumer or the client will acknowledge
1169             *            any messages it receives; ignored if the session is
1170             *            transacted. Legal values are
1171             *            <code>Session.AUTO_ACKNOWLEDGE</code>,
1172             *            <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1173             *            <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1174             * @return a newly created queue session
1175             * @throws JMSException
1176             *             if the <CODE>QueueConnection</CODE> object fails to create
1177             *             a session due to some internal error or lack of support for
1178             *             the specific transaction and acknowledgement mode.
1179             * @see Session#AUTO_ACKNOWLEDGE
1180             * @see Session#CLIENT_ACKNOWLEDGE
1181             * @see Session#DUPS_OK_ACKNOWLEDGE
1182             */
1183            public QueueSession createQueueSession(boolean transacted,
1184                            int acknowledgeMode) throws JMSException {
1185                    return new ActiveMQQueueSession((ActiveMQSession) createSession(
1186                                    transacted, acknowledgeMode));
1187            }
1188    
1189            /**
1190             * Creates a connection consumer for this connection (optional operation).
1191             * This is an expert facility not used by regular JMS clients.
1192             * 
1193             * @param queue
1194             *            the queue to access
1195             * @param messageSelector
1196             *            only messages with properties matching the message selector
1197             *            expression are delivered. A value of null or an empty string
1198             *            indicates that there is no message selector for the message
1199             *            consumer.
1200             * @param sessionPool
1201             *            the server session pool to associate with this connection
1202             *            consumer
1203             * @param maxMessages
1204             *            the maximum number of messages that can be assigned to a
1205             *            server session at one time
1206             * @return the connection consumer
1207             * @throws JMSException
1208             *             if the <CODE>QueueConnection</CODE> object fails to create
1209             *             a connection consumer due to some internal error or invalid
1210             *             arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
1211             * @throws InvalidDestinationException
1212             *             if an invalid queue is specified.
1213             * @throws InvalidSelectorException
1214             *             if the message selector is invalid.
1215             * @see javax.jms.ConnectionConsumer
1216             */
1217            public ConnectionConsumer createConnectionConsumer(Queue queue,
1218                            String messageSelector, ServerSessionPool sessionPool,
1219                            int maxMessages) throws JMSException {
1220                    checkClosed();
1221                    ensureClientIDInitialised();
1222                    ConsumerInfo info = new ConsumerInfo();
1223                    info.setConsumerId(this.handleIdGenerator.generateId());
1224                    info.setDestination(ActiveMQMessageTransformation
1225                                    .transformDestination(queue));
1226                    info.setSelector(messageSelector);
1227                    info.setConsumerNo(handleIdGenerator.getNextShortSequence());
1228                    return new ActiveMQConnectionConsumer(this, sessionPool, info,
1229                                    maxMessages);
1230            }
1231    
1232            /**
1233             * Ensures that the clientID was manually specified and not auto-generated.
1234             * If the clientID was not specified this method will throw an exception.
1235             * This method is used to ensure that the clientID + durableSubscriber name
1236             * are used correctly.
1237             * 
1238             * @throws JMSException
1239             */
1240            public void checkClientIDWasManuallySpecified() throws JMSException {
1241                    if (!userSpecifiedClientID) {
1242                            throw new JMSException(
1243                                            "You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1244                    }
1245            }
1246    
1247            /**
1248             * handle disconnect/reconnect events
1249             * 
1250             * @param event
1251             */
1252            public void statusChanged(TransportStatusEvent event) {
1253                    log.info("channel status changed: " + event);
1254                    if (event.getChannelStatus() == TransportStatusEvent.RECONNECTED) {
1255                            isTransportOK = true;
1256                            doReconnect();
1257    
1258                    } else if (event.getChannelStatus() == TransportStatusEvent.DISCONNECTED) {
1259                            isTransportOK = false;
1260                            clearMessagesInProgress();
1261                    }
1262            }
1263    
1264            /**
1265             * send a Packet through the Connection - for internal use only
1266             * 
1267             * @param packet
1268             * @throws JMSException
1269             */
1270            public void asyncSendPacket(Packet packet) throws JMSException {
1271                    asyncSendPacket(packet, true);
1272            }
1273    
1274            /**
1275             * send a Packet through the Connection - for internal use only
1276             * 
1277             * @param packet
1278             * @param doSendWhileReconnecting
1279             * @throws JMSException
1280             */
1281            public void asyncSendPacket(Packet packet, boolean doSendWhileReconnecting)
1282                            throws JMSException {
1283                    if (isTransportOK
1284                                    && !closed
1285                                    && (doSendWhileReconnecting || transportChannel
1286                                                    .isTransportConnected())) {
1287                            packet.setId(packetIdGenerator.getNextShortSequence());
1288                            packet.setReceiptRequired(false);
1289                            if (packet.isJMSMessage() && flowControlSleepTime > 0) {
1290                                    try {
1291                                            Thread.sleep(flowControlSleepTime);
1292                                    } catch (InterruptedException e) {
1293                                    }
1294                            }
1295                            this.transportChannel.asyncSend(packet);
1296                    }
1297            }
1298    
1299            /**
1300             * send a Packet through a Connection - for internal use only
1301             * 
1302             * @param packet
1303             * @throws JMSException
1304             */
1305            public void syncSendPacket(Packet packet) throws JMSException {
1306                    syncSendPacket(packet, 0);
1307            }
1308    
1309            /**
1310             * Send a packet through a Connection - for internal use only
1311             * 
1312             * @param packet
1313             * @param timeout
1314             * @throws JMSException
1315             */
1316            public void syncSendPacket(Packet packet, int timeout) throws JMSException {
1317                    if (isTransportOK && !closed) {
1318                            Receipt receipt;
1319                            packet.setId(packetIdGenerator.getNextShortSequence());
1320                            packet.setReceiptRequired(true);
1321                            receipt = this.transportChannel.send(packet, timeout);
1322                            if (receipt != null) {
1323                                    if (receipt.isFailed()) {
1324                                            Throwable e = receipt.getException();
1325                                            if (e != null) {
1326                                                    throw JMSExceptionHelper.newJMSException(e);
1327                                            }
1328                                            throw new JMSException(
1329                                                            "syncSendPacket failed with unknown exception");
1330                                    }
1331                            }
1332                    } else {
1333                            if (closed) {
1334                                    throw new ConnectionClosedException();
1335                            } else {
1336                                    throw new JMSException(
1337                                                    "syncSendTimedOut: connection no longer OK");
1338                            }
1339                    }
1340            }
1341    
1342            public Receipt syncSendRequest(Packet packet) throws JMSException {
1343                    checkClosed();
1344                    if (isTransportOK && !closed) {
1345                            Receipt receipt;
1346                            packet.setReceiptRequired(true);
1347                            packet.setId(this.packetIdGenerator.getNextShortSequence());
1348    
1349                            receipt = this.transportChannel.send(packet);
1350                            if (receipt != null && receipt.isFailed()) {
1351                                    Throwable e = receipt.getException();
1352                                    if (e != null) {
1353                                            throw (JMSException) new JMSException(e.getMessage())
1354                                                            .initCause(e);
1355                                    }
1356                                    throw new JMSException(
1357                                                    "syncSendPacket failed with unknown exception");
1358                            }
1359                            return receipt;
1360                    } else {
1361                            if (closed) {
1362                                    throw new ConnectionClosedException();
1363                            } else {
1364                                    throw new JMSException(
1365                                                    "syncSendTimedOut: connection no longer OK");
1366                            }
1367                    }
1368            }
1369    
1370            // Properties
1371            // -------------------------------------------------------------------------
1372    
1373            /**
1374             * @return Returns the prefetchPolicy.
1375             */
1376            public ActiveMQPrefetchPolicy getPrefetchPolicy() {
1377                    return prefetchPolicy;
1378            }
1379    
1380            /**
1381             * @param prefetchPolicy
1382             *            The prefetchPolicy to set.
1383             */
1384            public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
1385                    this.prefetchPolicy = prefetchPolicy;
1386            }
1387    
1388            public int getSendCloseTimeout() {
1389                    return sendCloseTimeout;
1390            }
1391    
1392            public void setSendCloseTimeout(int sendCloseTimeout) {
1393                    this.sendCloseTimeout = sendCloseTimeout;
1394            }
1395    
1396            public int getSendConnectionInfoTimeout() {
1397                    return sendConnectionInfoTimeout;
1398            }
1399    
1400            public void setSendConnectionInfoTimeout(int sendConnectionInfoTimeout) {
1401                    this.sendConnectionInfoTimeout = sendConnectionInfoTimeout;
1402            }
1403    
1404            public TransportChannel getTransportChannel() {
1405                    return transportChannel;
1406            }
1407    
1408            /**
1409             * Returns the clientID of the connection, forcing one to be generated if
1410             * one has not yet been configured
1411             */
1412            public String getInitializedClientID() throws JMSException {
1413                    ensureClientIDInitialised();
1414                    return this.clientID;
1415            }
1416    
1417            // Implementation methods
1418            // -------------------------------------------------------------------------
1419    
1420            /**
1421             * Used internally for adding Sessions to the Connection
1422             * 
1423             * @param session
1424             * @throws JMSException
1425             */
1426            protected void addSession(ActiveMQSession session) throws JMSException {
1427                    this.sessions.add(session);
1428                    addMessageDispatcher(session);
1429                    if (started.get()) {
1430                            session.start();
1431                    }
1432                    SessionInfo info = createSessionInfo(session);
1433                    info.setStarted(true);
1434                    asyncSendPacket(info);
1435            }
1436    
1437            /**
1438             * Used interanlly for removing Sessions from a Connection
1439             * 
1440             * @param session
1441             * @throws JMSException
1442             */
1443            protected void removeSession(ActiveMQSession session) throws JMSException {
1444                    this.sessions.remove(session);
1445                    removeMessageDispatcher(session);
1446                    SessionInfo info = createSessionInfo(session);
1447                    info.setStarted(false);
1448                    asyncSendPacket(info, false);
1449            }
1450    
1451            private SessionInfo createSessionInfo(ActiveMQSession session) {
1452                    SessionInfo info = new SessionInfo();
1453                    info.setClientId(clientID);
1454                    info.setSessionId(session.getSessionId());
1455                    info.setStartTime(session.getStartTime());
1456                    return info;
1457            }
1458    
1459            /**
1460             * Add a ConnectionConsumer
1461             * 
1462             * @param connectionConsumer
1463             * @throws JMSException
1464             */
1465            protected void addConnectionConsumer(
1466                            ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1467                    this.connectionConsumers.add(connectionConsumer);
1468                    addMessageDispatcher(connectionConsumer);
1469            }
1470    
1471            /**
1472             * Remove a ConnectionConsumer
1473             * 
1474             * @param connectionConsumer
1475             */
1476            protected void removeConnectionConsumer(
1477                            ActiveMQConnectionConsumer connectionConsumer) {
1478                    this.connectionConsumers.add(connectionConsumer);
1479                    removeMessageDispatcher(connectionConsumer);
1480            }
1481    
1482            /**
1483             * Add a Message dispatcher to receive messages from the Broker
1484             * 
1485             * @param messageDispatch
1486             * @throws JMSException
1487             *             if an internal error
1488             */
1489            protected void addMessageDispatcher(
1490                            ActiveMQMessageDispatcher messageDispatch) throws JMSException {
1491                    this.messageDispatchers.add(messageDispatch);
1492            }
1493    
1494            /**
1495             * Remove a Message dispatcher
1496             * 
1497             * @param messageDispatcher
1498             */
1499            protected void removeMessageDispatcher(
1500                            ActiveMQMessageDispatcher messageDispatcher) {
1501                    this.messageDispatchers.remove(messageDispatcher);
1502            }
1503    
1504            /**
1505             * Used for handling async exceptions
1506             * 
1507             * @param jmsEx
1508             */
1509            protected void handleAsyncException(JMSException jmsEx) {
1510                    if (!closed) {
1511                            if (this.exceptionListener != null) {
1512                                    this.exceptionListener.onException(jmsEx);
1513                            } else {
1514                                    log.warn(
1515                                                    "Async exception with no exception listener: " + jmsEx,
1516                                                    jmsEx);
1517                            }
1518                    }
1519            }
1520    
1521            protected void sendConnectionInfoToBroker() throws JMSException {
1522                    sendConnectionInfoToBroker(sendConnectionInfoTimeout, closed, false);
1523            }
1524    
1525            /**
1526             * Send the ConnectionInfo to the Broker
1527             * 
1528             * @param timeout
1529             * @param isClosed
1530             * @throws JMSException
1531             */
1532            protected void sendConnectionInfoToBroker(int timeout, boolean forceResend,
1533                            boolean closing) throws JMSException {
1534                    // Can we skip sending the ConnectionInfo packet??
1535                    if (isConnectionInfoSentToBroker && !forceResend) {
1536                            return;
1537                    }
1538    
1539                    this.isConnectionInfoSentToBroker = true;
1540                    ensureClientIDInitialised();
1541                    ConnectionInfo info = new ConnectionInfo();
1542                    info.setClientId(this.clientID);
1543                    info.setHostName(IdGenerator.getHostName());
1544                    info.setUserName(userName);
1545                    info.setPassword(password);
1546                    info.setStartTime(startTime);
1547                    info.setStarted(started.get());
1548                    info.setClosed(closed || closing);
1549                    info.setClientVersion(connectionMetaData.getProviderVersion());
1550                    info.setWireFormatVersion(transportChannel
1551                                    .getCurrentWireFormatVersion());
1552                    if (info.getProperties() != null) {
1553                            info.getProperties().setProperty(ConnectionInfo.NO_DELAY_PROPERTY,
1554                                            new Boolean(!useAsyncSend).toString());
1555                    }
1556                    if (quickClose && info.isClosed()) {
1557                            asyncSendPacket(info);
1558                    } else {
1559                            syncSendPacket(info, timeout);
1560                    }
1561            }
1562    
1563            /**
1564             * Set the maximum amount of memory this Connection should use for buffered
1565             * inbound messages
1566             * 
1567             * @param newMemoryLimit
1568             *            the new memory limit in bytes
1569             */
1570            public void setConnectionMemoryLimit(int newMemoryLimit) {
1571                    memoryManager.setValueLimit(newMemoryLimit);
1572            }
1573    
1574            /**
1575             * Get the current value for the maximum amount of memory this Connection
1576             * should use for buffered inbound messages
1577             * 
1578             * @return the current limit in bytes
1579             */
1580            public int getConnectionMemoryLimit() {
1581                    return (int) memoryManager.getValueLimit();
1582            }
1583    
1584            /**
1585             * CapacityMonitorEventListener implementation called when the capacity of a
1586             * CapacityService changes
1587             * 
1588             * @param event
1589             */
1590            public void capacityChanged(CapacityMonitorEvent event) {
1591                    // send the event to broker ...
1592                    CapacityInfo info = new CapacityInfo();
1593                    info.setResourceName(event.getMonitorName());
1594                    info.setCapacity(event.getCapacity());
1595                    // System.out.println("Cap changed: " + event);
1596                    try {
1597                            asyncSendPacket(info, false);
1598                    } catch (JMSException e) {
1599                            JMSException jmsEx = new JMSException(
1600                                            "failed to send change in capacity");
1601                            jmsEx.setLinkedException(e);
1602                            handleAsyncException(jmsEx);
1603                    }
1604            }
1605    
1606            /**
1607             * @return a number unique for this connection
1608             */
1609            protected int getNextConsumerNumber() {
1610                    return this.consumerNumberGenerator.increment();
1611            }
1612    
1613            protected short generateSessionId() {
1614                    return this.sessionIdGenerator.getNextShortSequence();
1615            }
1616    
1617            private synchronized void ensureClientIDInitialised() {
1618                    if (this.clientID == null || this.clientID.trim().equals("")) {
1619                            this.clientID = this.clientIdGenerator.generateId();
1620                    }
1621                    transportChannel.setClientID(clientID);
1622                    this.clientIDSet = true;
1623            }
1624    
1625            protected MemoryBoundedQueue getMemoryBoundedQueue(String name) {
1626                    return boundedQueueManager.getMemoryBoundedQueue(name);
1627            }
1628    
1629            protected void doReconnect() {
1630                    try {
1631                            // send the Connection info again
1632                            sendConnectionInfoToBroker(sendConnectionInfoTimeout, true, false);
1633                            for (Iterator iter = sessions.iterator(); iter.hasNext();) {
1634                                    ActiveMQSession session = (ActiveMQSession) iter.next();
1635                                    SessionInfo sessionInfo = createSessionInfo(session);
1636                                    sessionInfo.setStarted(true);
1637                                    asyncSendPacket(sessionInfo, false);
1638                                    // send consumers
1639                                    for (Iterator consumersIterator = session.consumers.iterator(); consumersIterator
1640                                                    .hasNext();) {
1641                                            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) consumersIterator
1642                                                            .next();
1643                                            ConsumerInfo consumerInfo = session
1644                                                            .createConsumerInfo(consumer);
1645                                            consumerInfo.setStarted(true);
1646                                            asyncSendPacket(consumerInfo, false);
1647                                    }
1648                                    // send producers
1649                                    for (Iterator producersIterator = session.producers.iterator(); producersIterator
1650                                                    .hasNext();) {
1651                                            ActiveMQMessageProducer producer = (ActiveMQMessageProducer) producersIterator
1652                                                            .next();
1653                                            ProducerInfo producerInfo = session
1654                                                            .createProducerInfo(producer);
1655                                            producerInfo.setStarted(true);
1656                                            asyncSendPacket(producerInfo, false);
1657                                    }
1658                                    // send the current capacity
1659                                    CapacityMonitorEvent event = memoryManager
1660                                                    .generateCapacityMonitorEvent();
1661                                    if (event != null) {
1662                                            capacityChanged(event);
1663                                    }
1664                            }
1665                    } catch (JMSException jmsEx) {
1666                            log.error("Failed to do reconnection");
1667                            handleAsyncException(jmsEx);
1668                            isTransportOK = false;
1669                    }
1670            }
1671    
1672            /**
1673             * @return Returns the useAsyncSend.
1674             */
1675            public boolean isUseAsyncSend() {
1676                    return useAsyncSend;
1677            }
1678    
1679            /**
1680             * @param useAsyncSend
1681             *            The useAsyncSend to set.
1682             */
1683            public void setUseAsyncSend(boolean useAsyncSend) {
1684                    this.useAsyncSend = useAsyncSend;
1685            }
1686    
1687            /**
1688             * @return Returns the cachingEnabled.
1689             */
1690            public boolean isCachingEnabled() {
1691                    return cachingEnabled;
1692            }
1693    
1694            /**
1695             * @param cachingEnabled
1696             *            The cachingEnabled to set.
1697             */
1698            public void setCachingEnabled(boolean cachingEnabled) {
1699                    this.cachingEnabled = cachingEnabled;
1700            }
1701    
1702            /**
1703             * @return Returns the j2EEcompliant.
1704             */
1705            public boolean isJ2EEcompliant() {
1706                    return J2EEcompliant;
1707            }
1708    
1709            /**
1710             * @param ecompliant
1711             *            The j2EEcompliant to set.
1712             */
1713            public void setJ2EEcompliant(boolean ecompliant) {
1714                    J2EEcompliant = ecompliant;
1715            }
1716    
1717            /**
1718             * @return Returns the internalConnection.
1719             */
1720            public boolean isInternalConnection() {
1721                    return internalConnection;
1722            }
1723    
1724            /**
1725             * @param internalConnection
1726             *            The internalConnection to set.
1727             */
1728            public void setInternalConnection(boolean internalConnection) {
1729                    this.internalConnection = internalConnection;
1730            }
1731    
1732            /**
1733             * @return Returns the doMessageCompression.
1734             */
1735            public boolean isDoMessageCompression() {
1736                    return doMessageCompression
1737                                    && transportChannel.doesSupportMessageCompression();
1738            }
1739    
1740            /**
1741             * @param doMessageCompression
1742             *            The doMessageCompression to set.
1743             */
1744            public void setDoMessageCompression(boolean doMessageCompression) {
1745                    this.doMessageCompression = doMessageCompression
1746                                    && transportChannel.doesSupportMessageCompression();
1747            }
1748    
1749            /**
1750             * @return Returns the doMessageFragmentation.
1751             */
1752            public boolean isDoMessageFragmentation() {
1753                    return doMessageFragmentation
1754                                    && transportChannel.doesSupportMessageFragmentation();
1755            }
1756    
1757            /**
1758             * @param doMessageFragmentation
1759             *            The doMessageFragmentation to set.
1760             */
1761            public void setDoMessageFragmentation(boolean doMessageFragmentation) {
1762                    this.doMessageFragmentation = doMessageFragmentation
1763                                    && transportChannel.doesSupportMessageFragmentation();
1764            }
1765    
1766            /**
1767             * @return Returns the messageCompressionLevel.
1768             */
1769            public int getMessageCompressionLevel() {
1770                    return messageCompressionLevel;
1771            }
1772    
1773            /**
1774             * @param messageCompressionLevel
1775             *            The messageCompressionLevel to set.
1776             */
1777            public void setMessageCompressionLevel(int messageCompressionLevel) {
1778                    this.messageCompressionLevel = messageCompressionLevel;
1779            }
1780    
1781            /**
1782             * @return Returns the messageCompressionLimit.
1783             */
1784            public int getMessageCompressionLimit() {
1785                    return messageCompressionLimit;
1786            }
1787    
1788            /**
1789             * @param messageCompressionLimit
1790             *            The messageCompressionLimit to set.
1791             */
1792            public void setMessageCompressionLimit(int messageCompressionLimit) {
1793                    this.messageCompressionLimit = messageCompressionLimit;
1794            }
1795    
1796            /**
1797             * @return Returns the messageCompressionStrategy.
1798             */
1799            public int getMessageCompressionStrategy() {
1800                    return messageCompressionStrategy;
1801            }
1802    
1803            /**
1804             * @param messageCompressionStrategy
1805             *            The messageCompressionStrategy to set.
1806             */
1807            public void setMessageCompressionStrategy(int messageCompressionStrategy) {
1808                    this.messageCompressionStrategy = messageCompressionStrategy;
1809            }
1810    
1811            /**
1812             * @return Returns the messageFragmentationLimit.
1813             */
1814            public int getMessageFragmentationLimit() {
1815                    return messageFragmentationLimit;
1816            }
1817    
1818            /**
1819             * @param messageFragmentationLimit
1820             *            The messageFragmentationLimit to set.
1821             */
1822            public void setMessageFragmentationLimit(int messageFragmentationLimit) {
1823                    this.messageFragmentationLimit = messageFragmentationLimit;
1824            }
1825    
1826            /**
1827             * @return Returns the disableTimeStampsByDefault.
1828             */
1829            public boolean isDisableTimeStampsByDefault() {
1830                    return disableTimeStampsByDefault;
1831            }
1832    
1833            /**
1834             * @param disableTimeStampsByDefault
1835             *            The disableTimeStampsByDefault to set.
1836             */
1837            public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
1838                    this.disableTimeStampsByDefault = disableTimeStampsByDefault;
1839            }
1840    
1841            /**
1842             * Causes pre-serialization of messages before send By default this is on
1843             * 
1844             * @return Returns the prePrepareMessageOnSend.
1845             */
1846            public boolean isPrepareMessageBodyOnSend() {
1847                    return prepareMessageBodyOnSend;
1848            }
1849    
1850            /**
1851             * Causes pre-serialization of messages before send By default this is on
1852             * 
1853             * @param prePrepareMessageOnSend
1854             *            The prePrepareMessageOnSend to set.
1855             */
1856            public void setPrepareMessageBodyOnSend(boolean prePrepareMessageOnSend) {
1857                    this.prepareMessageBodyOnSend = prePrepareMessageOnSend;
1858            }
1859    
1860            /**
1861             * @return Returns the copyMessageOnSend.
1862             */
1863            public boolean isCopyMessageOnSend() {
1864                    return copyMessageOnSend;
1865            }
1866    
1867            /**
1868             * @param copyMessageOnSend
1869             *            The copyMessageOnSend to set.
1870             */
1871            public void setCopyMessageOnSend(boolean copyMessageOnSend) {
1872                    this.copyMessageOnSend = copyMessageOnSend;
1873            }
1874    
1875            /**
1876             * @return Returns the quickClose.
1877             */
1878            public boolean isQuickClose() {
1879                    return quickClose;
1880            }
1881    
1882            /**
1883             * @param quickClose
1884             *            The quickClose to set.
1885             */
1886            public void setQuickClose(boolean quickClose) {
1887                    this.quickClose = quickClose;
1888            }
1889    
1890            /**
1891             * @return Returns the optimizedMessageDispatch.
1892             */
1893            public boolean isOptimizedMessageDispatch() {
1894                    return optimizedMessageDispatch;
1895            }
1896    
1897            /**
1898             * @param optimizedMessageDispatch
1899             *            The optimizedMessageDispatch to set.
1900             */
1901            public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
1902                    this.optimizedMessageDispatch = optimizedMessageDispatch;
1903            }
1904    
1905            protected void clearMessagesInProgress() {
1906                    for (Iterator i = sessions.iterator(); i.hasNext();) {
1907                            ActiveMQSession session = (ActiveMQSession) i.next();
1908                            session.clearMessagesInProgress();
1909                    }
1910            }
1911    
1912            /**
1913             * Tells the broker to destroy a destination.
1914             * 
1915             * @param destination
1916             */
1917            public void destroyDestination(ActiveMQDestination destination)
1918                            throws JMSException {
1919                    BrokerAdminCommand command = new BrokerAdminCommand();
1920                    command.setCommand(BrokerAdminCommand.DESTROY_DESTINATION);
1921                    command.setDestination(destination);
1922                    syncSendPacket(command);
1923            }
1924    
1925            /**
1926             * Cleans up this connection so that it's state is as if the connection was
1927             * just created. This allows the Resource Adapter to clean up a connection
1928             * so that it can be reused without having to close and recreate the
1929             * connection.
1930             * 
1931             * @param sessionId
1932             */
1933            public void cleanup() throws JMSException {
1934    
1935                    try {
1936                            for (Iterator i = this.sessions.iterator(); i.hasNext();) {
1937                                    ActiveMQSession s = (ActiveMQSession) i.next();
1938                                    s.close();
1939                            }
1940                            for (Iterator i = this.connectionConsumers.iterator(); i.hasNext();) {
1941                                    ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i
1942                                                    .next();
1943                                    c.close();
1944                            }
1945                            this.connectionConsumers.clear();
1946                            this.messageDispatchers.clear();
1947                    } finally {
1948                            this.sessions.clear();
1949                            started.set(false);
1950                    }
1951    
1952                    setExceptionListener(null);
1953                    clientIDSet = false;
1954                    isConnectionInfoSentToBroker = false;
1955    
1956                    CleanupConnectionInfo cleanupInfo = new CleanupConnectionInfo();
1957                    cleanupInfo.setClientId(getClientID());
1958                    asyncSendPacket(cleanupInfo);
1959            }
1960    
1961            /**
1962             * Changes the associated username/password that is associated with this
1963             * connection. If the connection has been used, you must called cleanup()
1964             * before calling this method.
1965             * 
1966             * @throws IllegalStateException
1967             *             if the connection is in used.
1968             * @param sessionId
1969             */
1970            public void changeUserInfo(String theUserName, String thePassword)
1971                            throws JMSException {
1972                    if (isConnectionInfoSentToBroker)
1973                            throw new IllegalStateException(
1974                                            "changeUserInfo used Connection is not allowed");
1975    
1976                    this.userName = theUserName;
1977                    this.password = thePassword;
1978            }
1979    
1980            protected void addToTransientConsumedRedeliverCache(ActiveMQMessage message) {
1981                    transientConsumedRedeliverCache.add(message);
1982            }
1983    
1984            protected void replayTransientConsumedRedeliveredMessages(
1985                            ActiveMQSession session, ActiveMQMessageConsumer consumer)
1986                            throws JMSException {
1987                    if (consumer.getDestination().isTopic()
1988                                    && !transientConsumedRedeliverCache.isEmpty()) {
1989                            Filter filter = getFilterFactory().createFilter(
1990                                            consumer.getDestination(), consumer.getMessageSelector());
1991                            if (consumer.isNoLocal()) {
1992                                    filter = new AndFilter(filter, new NoLocalFilter(clientID));
1993                            }
1994                            for (Iterator i = transientConsumedRedeliverCache.iterator(); i
1995                                            .hasNext();) {
1996                                    ActiveMQMessage message = (ActiveMQMessage) i.next();
1997                                    if (filter.matches(message)) {
1998                                            transientConsumedRedeliverCache.remove(message);
1999                                            message.setMessageAcknowledge(session);
2000                                            message.setJMSRedelivered(true);
2001                                            message.setConsumerNos(new int[] { consumer
2002                                                            .getConsumerNumber() });
2003                                            consumer.processMessage(message);
2004                                    }
2005                            }
2006                    }
2007            }
2008    
2009            private FilterFactory getFilterFactory() {
2010                    if (filterFactory == null) {
2011                            filterFactory = new FilterFactoryImpl();
2012                    }
2013                    return filterFactory;
2014            }
2015    
2016            protected void startTemporaryDestination(ActiveMQDestination dest)
2017                            throws JMSException {
2018                    if (dest != null && dest.isTemporary()) {
2019                            TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) tempDestinationMap
2020                                            .get(dest);
2021                            if (event == null) {
2022                                    event = new TempDestinationAdvisoryEvent(dest, true);
2023                                    tempDestinationMap.put(dest, event);
2024                                    ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
2025                                    msg.setObject(event);
2026                                    msg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
2027                                    msg.setJMSDestination(dest.getTopicForTempAdvisory());
2028                                    msg.setJMSMessageID("ID:" + dest.getPhysicalName()
2029                                                    + " .started");
2030                                    this.syncSendPacket(msg);
2031                            }
2032                    }
2033            }
2034    
2035            protected void stopTemporaryDestination(ActiveMQDestination dest)
2036                            throws JMSException {
2037                    if (dest != null && dest.isTemporary()) {
2038                            TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) tempDestinationMap
2039                                            .remove(dest);
2040                            if (event != null) {
2041                                    event.setStarted(false);
2042                                    ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
2043                                    msg.setObject(event);
2044                                    msg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
2045                                    msg.setJMSDestination(dest.getTopicForTempAdvisory());
2046                                    msg.setJMSMessageID("ID:" + dest.getPhysicalName()
2047                                                    + " .stopped");
2048                                    this.syncSendPacket(msg);
2049                            }
2050                    }
2051            }
2052    
2053            protected void closeTemporaryDestinations() throws JMSException {
2054                    for (Iterator i = tempDestinationMap.keySet().iterator(); i.hasNext();) {
2055                            ActiveMQDestination dest = (ActiveMQDestination) i.next();
2056                            stopTemporaryDestination(dest);
2057                    }
2058            }
2059    
2060            protected void startAdvisoryForTempDestination(Destination d)
2061                            throws JMSException {
2062                    if (d != null) {
2063                            ActiveMQDestination dest = (ActiveMQDestination) ActiveMQMessageTransformation
2064                                            .transformDestination(d);
2065                            if (dest.isTemporary()) {
2066                                    TempDestinationAdvisor test = (TempDestinationAdvisor) validDestinationsMap
2067                                                    .get(dest);
2068                                    if (test == null) {
2069                                            test = new TempDestinationAdvisor(this, dest);
2070                                            test.start();
2071                                            validDestinationsMap.put(dest, test);
2072                                    }
2073                            }
2074                    }
2075            }
2076    
2077            protected void stopAdvisoryForTempDestination(ActiveMQDestination d)
2078                            throws JMSException {
2079                    if (d != null) {
2080                            ActiveMQDestination dest = (ActiveMQDestination) ActiveMQMessageTransformation
2081                                            .transformDestination(d);
2082                            if (dest.isTemporary()) {
2083                                    TempDestinationAdvisor test = (TempDestinationAdvisor) validDestinationsMap
2084                                                    .remove(dest);
2085                                    if (test != null) {
2086                                            test.stop();
2087                                    }
2088                            }
2089                    }
2090            }
2091    
2092            protected final void validateDestination(ActiveMQDestination dest)
2093                            throws JMSException {
2094                    if (dest != null) {
2095                            if (dest.isTemporary()) {
2096                                    TempDestinationAdvisor test = (TempDestinationAdvisor) validDestinationsMap
2097                                                    .get(dest);
2098                                    if (dest.isDeleted() || test == null || !test.isActive(dest)) {
2099                                            throw new JMSException(
2100                                                            "Cannot publish to a deleted Destination: " + dest);
2101                                    }
2102                            }
2103                    }
2104            }
2105    
2106            /**
2107             * @return Returns the resourceManagerId.
2108             * @throws JMSException
2109             */
2110            synchronized public String getResourceManagerId() throws JMSException {
2111                    if (resourceManagerId == null) {
2112                            resourceManagerId = determineResourceManagerId();
2113                    }
2114                    return resourceManagerId;
2115            }
2116    
2117            /**
2118             * Get's the resource manager id.
2119             */
2120            private String determineResourceManagerId() throws JMSException {
2121    
2122                    XATransactionInfo info = new XATransactionInfo();
2123                    info.setType(TransactionInfo.GET_RM_ID);
2124    
2125                    ResponseReceipt receipt = (ResponseReceipt) syncSendRequest(info);
2126                    String rmId = (String) receipt.getResult();
2127                    assert rmId != null;
2128                    return rmId;
2129            }
2130    
2131    
2132    }