001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.broker.impl;
020    
021    import java.io.File;
022    import java.io.IOException;
023    import java.util.ArrayList;
024    import java.util.Hashtable;
025    import java.util.Iterator;
026    import java.util.Map;
027    import javax.jms.JMSException;
028    import javax.naming.Context;
029    import javax.transaction.xa.XAException;
030    import org.activemq.broker.Broker;
031    import org.activemq.broker.BrokerAdmin;
032    import org.activemq.broker.BrokerClient;
033    import org.activemq.broker.ConsumerInfoListener;
034    import org.activemq.capacity.DelegateCapacityMonitor;
035    import org.activemq.io.util.MemoryBoundedObjectManager;
036    import org.activemq.io.util.MemoryBoundedQueueManager;
037    import org.activemq.jndi.ReadOnlyContext;
038    import org.activemq.message.ActiveMQDestination;
039    import org.activemq.message.ActiveMQMessage;
040    import org.activemq.message.ActiveMQXid;
041    import org.activemq.message.ConnectionInfo;
042    import org.activemq.message.ConsumerInfo;
043    import org.activemq.message.MessageAck;
044    import org.activemq.message.ProducerInfo;
045    import org.activemq.security.SecurityAdapter;
046    import org.activemq.service.DeadLetterPolicy;
047    import org.activemq.service.MessageContainerAdmin;
048    import org.activemq.service.MessageContainerManager;
049    import org.activemq.service.RedeliveryPolicy;
050    import org.activemq.service.Transaction;
051    import org.activemq.service.TransactionManager;
052    import org.activemq.service.boundedvm.DurableQueueBoundedMessageManager;
053    import org.activemq.service.boundedvm.TransientQueueBoundedMessageManager;
054    import org.activemq.service.boundedvm.TransientTopicBoundedMessageManager;
055    import org.activemq.service.impl.DurableTopicMessageContainerManager;
056    import org.activemq.store.PersistenceAdapter;
057    import org.activemq.store.PersistenceAdapterFactory;
058    import org.activemq.store.TransactionStore;
059    import org.activemq.store.vm.VMPersistenceAdapter;
060    import org.activemq.store.vm.VMTransactionManager;
061    import org.activemq.util.Callback;
062    import org.activemq.util.ExceptionTemplate;
063    import org.activemq.util.JMSExceptionHelper;
064    import org.apache.commons.logging.Log;
065    import org.apache.commons.logging.LogFactory;
066    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
067    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
068    
069    /**
070     * The default {@link Broker} implementation
071     *
072     * @version $Revision: 1.1.1.1 $
073     */
074    public class DefaultBroker extends DelegateCapacityMonitor implements Broker, BrokerAdmin {
075    
076        private static final Log log = LogFactory.getLog(DefaultBroker.class);
077    
078        protected static final String PROPERTY_STORE_DIRECTORY = "activemq.store.dir";
079        protected static final String PERSISTENCE_ADAPTER_FACTORY = "activemq.persistenceAdapterFactory";
080    
081        protected static final Class[] NEWINSTANCE_PARAMETER_TYPES = {File.class};
082    
083        private static final long DEFAULT_MAX_MEMORY_USAGE = 20 * 1024 * 1024; //20mb
084    
085        private PersistenceAdapter persistenceAdapter;
086        private TransactionManager transactionManager;
087        private MessageContainerManager[] containerManagers;
088        private File tempDir;
089        private MemoryBoundedObjectManager memoryManager;
090        private MemoryBoundedQueueManager queueManager;
091        private TransactionStore preparedTransactionStore;
092        private final String brokerName;
093        private final String brokerClusterName;
094        private Map containerManagerMap;
095        private CopyOnWriteArrayList consumerInfoListeners;
096        private MessageContainerManager persistentTopicMCM;
097        private MessageContainerManager transientTopicMCM;
098        private MessageContainerManager transientQueueMCM;
099        private DurableQueueBoundedMessageManager persistentQueueMCM;
100        private SecurityAdapter securityAdapter;
101        private RedeliveryPolicy redeliveryPolicy;
102        private DeadLetterPolicy deadLetterPolicy;
103        private AdvisorySupport  advisory;
104        private Map messageConsumers = new ConcurrentHashMap();
105    
106    
107    
108        public DefaultBroker(String brokerName, String brokerClusterName, MemoryBoundedObjectManager memoryManager) {
109            this.brokerName = brokerName;
110            this.brokerClusterName = brokerClusterName;
111            this.memoryManager = memoryManager;
112            queueManager = new MemoryBoundedQueueManager(memoryManager);
113            setDelegate(memoryManager);
114            containerManagerMap = new ConcurrentHashMap();
115            consumerInfoListeners = new CopyOnWriteArrayList();
116            this.advisory = new AdvisorySupport(this);
117        }
118    
119        public DefaultBroker(String brokerName, MemoryBoundedObjectManager memoryManager) {
120            this(brokerName, "default", memoryManager);
121        }
122    
123        public DefaultBroker(String brokerName, String cluserName) {
124            this(brokerName, cluserName, new MemoryBoundedObjectManager("Broker Memory Manager", DEFAULT_MAX_MEMORY_USAGE));
125        }
126    
127        public DefaultBroker(String brokerName) {
128            this(brokerName, new MemoryBoundedObjectManager("Broker Memory Manager", DEFAULT_MAX_MEMORY_USAGE));
129        }
130    
131        public DefaultBroker(String brokerName, String brokerClusterName, PersistenceAdapter persistenceAdapter) {
132            this(brokerName, brokerClusterName, new MemoryBoundedObjectManager("Broker Memory Manager", DEFAULT_MAX_MEMORY_USAGE));
133            this.persistenceAdapter = persistenceAdapter;
134        }
135    
136        public DefaultBroker(String brokerName, PersistenceAdapter persistenceAdapter) {
137            this(brokerName);
138            this.persistenceAdapter = persistenceAdapter;
139        }
140    
141        /**
142         * Start this Service
143         *
144         * @throws JMSException
145         */
146        public void start() throws JMSException {
147            if (redeliveryPolicy == null) {
148                redeliveryPolicy = new RedeliveryPolicy();
149            }
150            if (deadLetterPolicy == null){
151                deadLetterPolicy = new DeadLetterPolicy(this);
152            }
153            if (persistenceAdapter == null) {
154                persistenceAdapter = createPersistenceAdapter();
155            }
156            persistenceAdapter.start();
157    
158            if (transactionManager == null) {
159                preparedTransactionStore = persistenceAdapter.createTransactionStore();
160                transactionManager = new VMTransactionManager(this, preparedTransactionStore);
161            }
162    
163            // force containers to be created
164            if (containerManagerMap.isEmpty()) {
165                makeDefaultContainerManagers();
166            }
167            getContainerManagers();
168    
169            for (int i = 0; i < containerManagers.length; i++) {
170                containerManagers[i].setDeadLetterPolicy(this.deadLetterPolicy);
171                containerManagers[i].start();
172            }
173            transactionManager.start();
174        }
175    
176    
177        /**
178         * stop this Service
179         *
180         * @throws JMSException
181         */
182    
183        public void stop() throws JMSException {
184            ExceptionTemplate template = new ExceptionTemplate();
185    
186            if (containerManagers != null) {
187                for (int i = 0; i < containerManagers.length; i++) {
188                    final MessageContainerManager containerManager = containerManagers[i];
189                    template.run(new Callback() {
190                        public void execute() throws Throwable {
191                            containerManager.stop();
192                        }
193                    });
194                }
195            }
196            if (transactionManager != null) {
197                template.run(new Callback() {
198                    public void execute() throws Throwable {
199                        transactionManager.stop();
200                    }
201                });
202            }
203    
204            template.run(new Callback() {
205                public void execute() throws Throwable {
206                    persistenceAdapter.stop();
207                }
208            });
209    
210            template.throwJMSException();
211        }
212    
213        // Broker interface
214        //-------------------------------------------------------------------------
215    
216        public void addClient(BrokerClient client, ConnectionInfo info) throws JMSException {
217            if (securityAdapter != null) {
218                securityAdapter.authorizeConnection(client, info);
219            }
220            advisory.addConnection(client,info);
221        }
222    
223        public void removeClient(BrokerClient client, ConnectionInfo info) throws JMSException {
224            if (transactionManager != null) {
225                transactionManager.cleanUpClient(client);
226            }
227            advisory.removeConnection(client,info);
228        }
229    
230        public void addMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
231            if (securityAdapter != null) {
232                securityAdapter.authorizeProducer(client, info);
233            }
234            advisory.addProducer(client,info);
235        }
236    
237        public void removeMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
238            advisory.removeProducer(client,info);
239        }
240    
241        /**
242         * Add an active message consumer
243         */
244        public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
245            validateConsumer(info);
246            if (securityAdapter != null) {
247                securityAdapter.authorizeConsumer(client, info);
248            }
249            advisory.addAdvisory(client, info);
250            MessageContainerManager[] array = getContainerManagers();
251            for (int i = 0;i < array.length;i++) {
252                array[i].addMessageConsumer(client, info);
253            }
254            fireConsumerInfo(client, info);
255            messageConsumers.put(info,client);
256        }
257    
258        /**
259         * remove an active message consumer
260         */
261        public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
262            validateConsumer(info);
263            advisory.removeAdvisory(client, info);
264            for (int i = 0;i < containerManagers.length;i++) {
265                containerManagers[i].removeMessageConsumer(client, info);
266            }
267            fireConsumerInfo(client, info);
268            messageConsumers.remove(info);
269        }
270    
271    
272        /**
273         * send a message to the broker
274         */
275        public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
276            checkValid();
277            ActiveMQDestination destination = message.getJMSActiveMQDestination();
278            if (destination == null) {
279                throw new JMSException("No destination specified for the Message");
280            }
281            if (message.getJMSMessageID() == null && !destination.isAdvisory()) {
282                throw new JMSException("No messageID specified for the Message");
283            }
284            associateTransaction(message);
285            try {
286                if (destination.isComposite()) {
287                    boolean first = true;
288                    for (Iterator iter = destination.getChildDestinations().iterator();iter.hasNext();) {
289                        ActiveMQDestination childDestination = (ActiveMQDestination) iter.next();
290                        // lets shallow copy just in case
291                        if (first) {
292                            first = false;
293                        }
294                        else {
295                            message = message.shallowCopy();
296                        }
297                        message.setJMSDestination(childDestination);
298                        doMessageSend(client, message);
299                    }
300                }
301                else {
302                    if (destination.isTempDestinationAdvisory() && !client.isBrokerConnection()) {
303                        advisory.processTempDestinationAdvisory(client,message);
304                    }
305                    doMessageSend(client, message);
306                }
307            }
308            finally {
309                disAssociateTransaction();
310            }
311        }
312    
313        /**
314         * Acknowledge consumption of a message by the Message Consumer
315         */
316        public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
317            
318            associateTransaction(ack);
319            try {
320                    for (int i = 0; i < containerManagers.length; i++) {
321                        containerManagers[i].acknowledgeMessage(client, ack);
322                    }
323            } finally {
324                disAssociateTransaction();
325            }
326            
327        }
328    
329        public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
330            for (int i = 0; i < containerManagers.length; i++) {
331                containerManagers[i].deleteSubscription(clientId, subscriberName);
332            }
333        }
334    
335    
336        /**
337         * Start a transaction.
338         *
339         * @see org.activemq.broker.Broker#startTransaction(org.activemq.broker.BrokerClient, java.lang.String)
340         */
341        public void startTransaction(BrokerClient client, String transactionId) throws JMSException {
342            transactionManager.createLocalTransaction(client, transactionId);
343        }
344    
345        public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
346            try {
347                Transaction transaction = transactionManager.getLocalTransaction(transactionId);
348                transaction.commit(true);
349            }
350            catch (XAException e) {
351                // TODO: I think the XAException should propagate all the way to the client.
352                throw (JMSException) new JMSException(e.getMessage()).initCause(e);
353            }
354        }
355    
356        /**
357         * rollback a transaction
358         */
359        public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException {
360            try {
361                Transaction transaction = transactionManager.getLocalTransaction(transactionId);
362                transaction.rollback();
363            }
364            catch (XAException e) {
365                // TODO: I think the XAException should propagate all the way to the client.
366                throw (JMSException) new JMSException(e.getMessage()).initCause(e);
367            }
368        }
369    
370        /**
371         * Starts an XA Transaction.
372         *
373         * @see org.activemq.broker.Broker#startTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
374         */
375        public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
376            transactionManager.createXATransaction(client, xid);
377        }
378    
379        /**
380         * Prepares an XA Transaciton.
381         *
382         * @see org.activemq.broker.Broker#prepareTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
383         */
384        public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
385            Transaction transaction = transactionManager.getXATransaction(xid);
386            return transaction.prepare();
387        }
388    
389        /**
390         * Rollback an XA Transaction.
391         *
392         * @see org.activemq.broker.Broker#rollbackTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
393         */
394        public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
395            Transaction transaction = transactionManager.getXATransaction(xid);
396            transaction.rollback();
397        }
398    
399        /**
400         * Commit an XA Transaction.
401         *
402         * @see org.activemq.broker.Broker#commitTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid, boolean)
403         */
404        public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException {
405            Transaction transaction = transactionManager.getXATransaction(xid);
406            transaction.commit(onePhase);
407        }
408    
409        /**
410         * Gets the prepared XA transactions.
411         *
412         * @see org.activemq.broker.Broker#getPreparedTransactions(org.activemq.broker.BrokerClient)
413         */
414        public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException {
415            return transactionManager.getPreparedXATransactions();
416        }
417        
418        
419    
420    
421        // Properties
422        //-------------------------------------------------------------------------
423    
424        /**
425         * Get a temp directory - used for spooling
426         *
427         * @return a File ptr to the directory
428         */
429        public File getTempDir() {
430            if (tempDir == null) {
431                String dirName = System.getProperty("activemq.store.tempdir", "ActiveMQTemp");
432                tempDir = new File(dirName);
433            }
434            return tempDir;
435        }
436    
437        public String getBrokerName() {
438            return brokerName;
439        }
440    
441        /**
442         * @return Returns the brokerClusterName.
443         */
444        public String getBrokerClusterName() {
445            return brokerClusterName;
446        }
447    
448        public void setTempDir(File tempDir) {
449            this.tempDir = tempDir;
450        }
451    
452        public MessageContainerManager[] getContainerManagers() {
453            if (containerManagers == null) {
454                containerManagers = createContainerManagers();
455            }
456            return containerManagers;
457        }
458    
459        public Map getContainerManagerMap() {
460            return containerManagerMap;
461        }
462    
463        public void setContainerManagerMap(Map containerManagerMap) {
464            this.containerManagerMap = containerManagerMap;
465            this.containerManagers = null;
466        }
467    
468        public PersistenceAdapter getPersistenceAdapter() {
469            return persistenceAdapter;
470        }
471    
472        public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {
473            this.persistenceAdapter = persistenceAdapter;
474        }
475    
476        public TransactionManager getTransactionManager() {
477            return transactionManager;
478        }
479    
480        public void setTransactionManager(TransactionManager transactionManager) {
481            this.transactionManager = transactionManager;
482        }
483    
484        public SecurityAdapter getSecurityAdapter() {
485            return securityAdapter;
486        }
487    
488        public void setSecurityAdapter(SecurityAdapter securityAdapter) {
489            this.securityAdapter = securityAdapter;
490        }
491    
492        public RedeliveryPolicy getRedeliveryPolicy() {
493            return redeliveryPolicy;
494        }
495    
496        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
497            this.redeliveryPolicy = redeliveryPolicy;
498        }
499    
500        public TransactionStore getPreparedTransactionStore() {
501            return preparedTransactionStore;
502        }
503    
504        public void setPreparedTransactionStore(TransactionStore preparedTransactionStore) {
505            this.preparedTransactionStore = preparedTransactionStore;
506        }
507        
508        /**
509         * @return the DeadLetterPolicy
510         */
511        public DeadLetterPolicy getDeadLetterPolicy(){
512            return deadLetterPolicy;
513        }
514        
515        /**
516         * set the dead letter policy
517         * @param deadLetterPolicy
518         */
519        public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy){
520            this.deadLetterPolicy = deadLetterPolicy;
521        }
522    
523        /**
524         * @return Returns the maximumMemoryUsage.
525         */
526        public long getMaximumMemoryUsage() {
527            return memoryManager.getValueLimit();
528        }
529    
530        /**
531         * @param maximumMemoryUsage The maximumMemoryUsage to set.
532         */
533        public void setMaximumMemoryUsage(long maximumMemoryUsage) {
534            this.memoryManager.setValueLimit(maximumMemoryUsage);
535        }
536    
537    
538        public Context getDestinationContext(Hashtable environment) {
539            Map data = new ConcurrentHashMap();
540            for (Iterator iter = containerManagerMap.entrySet().iterator(); iter.hasNext();) {
541                Map.Entry entry = (Map.Entry) iter.next();
542                String name = entry.getKey().toString();
543                MessageContainerManager manager = (MessageContainerManager) entry.getValue();
544                Context context = new ReadOnlyContext(environment, manager.getDestinations());
545                data.put(name, context);
546            }
547            return new ReadOnlyContext(environment, data);
548        }
549    
550        // Implementation methods
551        //-------------------------------------------------------------------------
552    
553    
554        protected void doMessageSend(BrokerClient client, ActiveMQMessage message) throws JMSException {
555            if (securityAdapter != null) {
556                securityAdapter.authorizeSendMessage(client, message);
557            }
558            ActiveMQDestination dest = message.getJMSActiveMQDestination();
559            if (dest.isTopic()){
560                if (message.isPersistent() && !dest.isTemporary()){
561                    persistentTopicMCM.sendMessage(client,message);
562                }
563                transientTopicMCM.sendMessage(client, message);
564            }else {
565                transientQueueMCM.sendMessage(client, message);
566                persistentQueueMCM.sendMessage(client, message);
567            }
568        }
569    
570        /**
571         * Factory method to create a default persistence adapter
572         *
573         * @return
574         */
575        protected PersistenceAdapter createPersistenceAdapter() throws JMSException {
576            File directory = new File(getStoreDirectory());
577    
578            // lets use reflection to avoid runtime dependency on persistence libraries
579            PersistenceAdapter answer = null;
580            String property = System.getProperty(PERSISTENCE_ADAPTER_FACTORY);
581            if (property != null) {
582                answer = tryCreatePersistenceAdapter(property, directory, false);
583            }
584            if (answer == null) {
585                answer = tryCreatePersistenceAdapter("org.activemq.broker.impl.DefaultPersistenceAdapterFactory", directory, true);
586            }
587            if (answer != null) {
588                return answer;
589            }
590            else {
591                log.warn("Default message store (journal+derby) could not be found in the classpath or property '" + PERSISTENCE_ADAPTER_FACTORY
592                        + "' not specified so defaulting to use RAM based message persistence");
593                return new VMPersistenceAdapter();
594            }
595        }
596    
597        protected PersistenceAdapter tryCreatePersistenceAdapter(String className, File directory, boolean ignoreErrors) throws JMSException {
598            Class adapterClass = loadClass(className, ignoreErrors);
599            if (adapterClass != null) {
600                try {
601                    PersistenceAdapterFactory factory = (PersistenceAdapterFactory) adapterClass.newInstance();
602                    PersistenceAdapter answer = factory.createPersistenceAdapter(directory, memoryManager);
603                    log.info("Persistence adapter created using: " + className);
604                    return answer;
605                }
606                catch (IOException cause) {
607                    throw createInstantiateAdapterException(className, (Exception) cause);
608                }
609                catch (Throwable e) {
610                    if (!ignoreErrors) {
611                        throw createInstantiateAdapterException(className, e);
612                    }
613                }
614            }
615            return null;
616        }
617    
618        protected JMSException createInstantiateAdapterException(String className, Throwable e) {
619            return JMSExceptionHelper.newJMSException("Persistence adapter could not be created using: "
620                    + className + ". Reason: " + e, e);
621        }
622    
623        /**
624         * Tries to load the given class from the current context class loader or
625         * class loader which loaded us or return null if the class could not be found
626         */
627        protected Class loadClass(String name, boolean ignoreErrors) throws JMSException {
628            try {
629                return Thread.currentThread().getContextClassLoader().loadClass(name);
630            }
631            catch (ClassNotFoundException e) {
632                try {
633                    return getClass().getClassLoader().loadClass(name);
634                }
635                catch (ClassNotFoundException e2) {
636                    if (ignoreErrors) {
637                        log.trace("Could not find class: " + name + " on the classpath");
638                        return null;
639                    }
640                    else {
641                        throw JMSExceptionHelper.newJMSException("Could not find class: " + name + " on the classpath. Reason: " + e, e);
642                    }
643                }
644            }
645        }
646    
647        protected String getStoreDirectory() {
648            String defaultDirectory = "ActiveMQ" + File.separator + sanitizeString(brokerName);
649            return System.getProperty(PROPERTY_STORE_DIRECTORY, defaultDirectory);
650        }
651    
652        /**
653         * Factory method to create the default container managers
654         *
655         * @return
656         */
657        protected MessageContainerManager[] createContainerManagers() {
658            int size = containerManagerMap.size();
659            MessageContainerManager[] answer = new MessageContainerManager[size];
660            containerManagerMap.values().toArray(answer);
661            return answer;
662        }
663    
664        protected void makeDefaultContainerManagers() {
665            transientTopicMCM = new TransientTopicBoundedMessageManager(queueManager);
666            containerManagerMap.put("transientTopicContainer", transientTopicMCM);
667            persistentTopicMCM = new DurableTopicMessageContainerManager(persistenceAdapter, redeliveryPolicy, deadLetterPolicy);
668            containerManagerMap.put("persistentTopicContainer", persistentTopicMCM);
669            persistentQueueMCM = new DurableQueueBoundedMessageManager(persistenceAdapter, queueManager, redeliveryPolicy, deadLetterPolicy);
670            containerManagerMap.put("persistentQueueContainer", persistentQueueMCM);
671            transientQueueMCM = new TransientQueueBoundedMessageManager(queueManager,redeliveryPolicy, deadLetterPolicy);
672            containerManagerMap.put("transientQueueContainer", transientQueueMCM);
673        }
674    
675        /**
676         * Ensures the consumer is valid, throwing a meaningful exception if not
677         *
678         * @param info
679         * @throws JMSException
680         */
681        protected void validateConsumer(ConsumerInfo info) throws JMSException {
682            if (info.getConsumerId() == null) {
683                throw new JMSException("No consumerId specified for the ConsumerInfo");
684            }
685        }
686    
687        protected void checkValid() throws JMSException {
688            if (containerManagers == null) {
689                throw new JMSException("This Broker has not yet been started. Ensure start() is called before invoking action methods");
690            }
691        }
692    
693        /**
694         * Add a ConsumerInfoListener to the Broker
695         *
696         * @param l
697         */
698        public void addConsumerInfoListener(ConsumerInfoListener l) {
699            if (l != null){
700                consumerInfoListeners.add(l);
701                //fire any existing infos to the listener
702                for (Iterator i = messageConsumers.entrySet().iterator(); i.hasNext();){
703                    Map.Entry entry = (Map.Entry)i.next();
704                    ConsumerInfo info = (ConsumerInfo) entry.getKey();
705                    BrokerClient client = (BrokerClient) entry.getValue();
706                    l.onConsumerInfo(client, info);
707                }
708            }
709        }
710    
711        /**
712         * Remove a ConsumerInfoListener from the Broker
713         *
714         * @param l
715         */
716        public void removeConsumerInfoListener(ConsumerInfoListener l) {
717            consumerInfoListeners.remove(l);
718        }
719    
720        protected void fireConsumerInfo(BrokerClient client, ConsumerInfo info) {
721            for (Iterator i = consumerInfoListeners.iterator(); i.hasNext();) {
722                ConsumerInfoListener l = (ConsumerInfoListener) i.next();
723                l.onConsumerInfo(client, info);
724            }
725        }
726    
727        /**
728         * @return the MessageContainerManager for durable topics
729         */
730        public MessageContainerManager getPersistentTopicContainerManager() {
731            return persistentTopicMCM;
732        }
733    
734        /**
735         * @return the MessageContainerManager for transient topics
736         */
737        public MessageContainerManager getTransientTopicContainerManager() {
738            return transientTopicMCM;
739        }
740    
741        /**
742         * @return the MessageContainerManager for persistent queues
743         */
744        public MessageContainerManager getPersistentQueueContainerManager() {
745            return persistentQueueMCM;
746        }
747    
748        /**
749         * @return the MessageContainerManager for transient queues
750         */
751        public MessageContainerManager getTransientQueueContainerManager() {
752            return transientQueueMCM;
753        }
754    
755        /**
756         * @see org.activemq.broker.Broker#getBrokerAdmin()
757         */
758        public BrokerAdmin getBrokerAdmin() {
759            return this;
760        }
761    
762        public void createMessageContainer(ActiveMQDestination dest) throws JMSException {
763            for (int i = 0; i < containerManagers.length; i++) {
764                containerManagers[i].createMessageContainer(dest);
765            }
766        }
767    
768        public void destoryMessageContainer(ActiveMQDestination dest) throws JMSException {
769            for (int i = 0; i < containerManagers.length; i++) {
770                containerManagers[i].destroyMessageContainer(dest);
771            }
772        }
773    
774        public MessageContainerAdmin getMessageContainerAdmin(ActiveMQDestination dest) throws JMSException {
775            for (int i = 0; i < containerManagers.length; i++) {
776                Map messageContainerAdmins = containerManagers[i].getMessageContainerAdmins();
777                MessageContainerAdmin mca = (MessageContainerAdmin) messageContainerAdmins.get(dest);
778                if( mca != null ) {
779                    return mca;
780                }
781            }
782            return null;
783        }
784    
785        /**
786         * @throws JMSException
787         * @see org.activemq.broker.BrokerAdmin#listDestinations()
788         */
789        public MessageContainerAdmin[] listMessageContainerAdmin() throws JMSException {
790            
791            ArrayList l = new ArrayList();
792            for (int i = 0; i < containerManagers.length; i++) {
793                Map messageContainerAdmins = containerManagers[i].getMessageContainerAdmins();
794                for (Iterator iter = messageContainerAdmins.values().iterator(); iter.hasNext();) {
795                    MessageContainerAdmin mca = (MessageContainerAdmin) iter.next();
796                    l.add(mca);
797                }
798            }
799            
800            MessageContainerAdmin answer[] = new MessageContainerAdmin[l.size()];
801            l.toArray(answer);
802            return answer;
803        }
804        
805        
806        /**
807         * Add a message to a dead letter queue
808         * @param deadLetterName
809         * @param message
810         * @throws JMSException
811         */
812        public void sendToDeadLetterQueue(String deadLetterName,ActiveMQMessage message) throws JMSException{
813            if (persistentQueueMCM != null){
814                persistentQueueMCM.sendToDeadLetterQueue(deadLetterName, message);
815                log.debug(message + " sent to DLQ: " + deadLetterName);
816            }  
817        }
818    
819        /**
820         * send a message to the broker within a transaction
821        public void sendTransactedMessage(final BrokerClient client, final String transactionId, final ActiveMQMessage message) throws JMSException {
822            getTransactionFor(message).addPostCommitTask(new SendMessageTransactionTask(client, message));
823        }
824         */
825        
826        /**
827         * Acknowledge consumption of a message within a transaction
828        public void acknowledgeTransactedMessage(final BrokerClient client, final String transactionId, final MessageAck ack) throws JMSException {
829            Transaction transaction;
830            if (ack.isXaTransacted()) {
831                try {
832                    transaction = transactionManager.getXATransaction(new ActiveMQXid(transactionId));
833                }
834                catch (XAException e) {
835                    throw (JMSException) new JMSException(e.getMessage()).initCause(e);
836                }
837            }
838            else {
839                transaction = transactionManager.getLocalTransaction(transactionId);
840            }
841            transaction.addPostCommitTask(new MessageAckTransactionTask(client, ack));
842            transaction.addPostRollbackTask(new RedeliverMessageTransactionTask(client, ack));
843    
844            // we need to tell the dispatcher that we can now accept another message
845            // even though we don't really ack the message until the commit
846            // this is because if we have a prefetch value of 1, we can never consume 2 messages
847            // in a transaction, since the ack for the first message never arrives until the commit
848            for (int i = 0; i < containerManagers.length; i++) {
849                containerManagers[i].acknowledgeTransactedMessage(client, transactionId, ack);
850            }
851        }
852         */
853    
854    
855        /**
856         * @param message
857         * @return
858         * @throws JMSException
859        private Transaction getTransactionFor(ActiveMQMessage message) throws JMSException {
860            String transactionId = message.getTransactionId();        
861            if (message.isXaTransacted()) {
862                try {
863                    return transactionManager.getXATransaction(new ActiveMQXid(transactionId));
864                }
865                catch (XAException e) {
866                    throw (JMSException) new JMSException(e.getMessage()).initCause(e);
867                }
868            }
869            return transactionManager.getLocalTransaction(transactionId);
870        }
871    
872    
873        public void acknowledgeMessageRecover(MessageAck ack) {
874        }
875        public void sendMessageRecover(ActiveMQMessage message) throws JMSException {
876        }
877         */
878    
879        /**
880         * Associates a Transaction with the current thread.  Once this call is finished,
881         * the Transactio ncan be obtained via TransactionManager.getContexTransaction().
882         * @param message
883         * @throws JMSException
884         */
885        private final void associateTransaction(ActiveMQMessage message) throws JMSException {
886            Transaction transaction;
887            if( message.isPartOfTransaction() ) {
888                if (message.isXaTransacted()) {
889                    try {
890                        transaction = transactionManager.getXATransaction((ActiveMQXid) message.getTransactionId());
891                    }
892                    catch (XAException e) {
893                        throw (JMSException) new JMSException(e.getMessage()).initCause(e);
894                    }
895                } else {
896                    transaction = transactionManager.getLocalTransaction((String) message.getTransactionId());
897                }
898                
899            } else {
900                transaction = null;
901            }                
902            TransactionManager.setContexTransaction(transaction);
903        }
904    
905        private void disAssociateTransaction() {
906            TransactionManager.setContexTransaction(null);
907        }
908        
909        /**
910         * Associates a Transaction with the current thread.  Once this call is finished,
911         * the Transactio ncan be obtained via TransactionManager.getContexTransaction().
912         * @param ack
913         * @throws JMSException
914         */
915        private void associateTransaction(MessageAck ack) throws JMSException {
916            Transaction transaction;
917            if( ack.isPartOfTransaction() ) {
918                if (ack.isXaTransacted()) {
919                    try {
920                        transaction = transactionManager.getXATransaction((ActiveMQXid) ack.getTransactionId());
921                    }
922                    catch (XAException e) {
923                        throw (JMSException) new JMSException(e.getMessage()).initCause(e);
924                    }
925                } else {
926                    transaction = transactionManager.getLocalTransaction((String) ack.getTransactionId());
927                }
928                
929            } else {
930                transaction = null;
931            }                
932            TransactionManager.setContexTransaction(transaction);
933        }
934        
935        private String sanitizeString(String in) {
936            String result = null;
937            if (in != null) {
938                result = in.replace(':', '_');
939                result = result.replace('/', '_');
940                result = result.replace('\\', '_');
941            }
942            return result;
943        }
944    
945        /**
946         * @return Returns the memoryManager.
947         */
948        public MemoryBoundedObjectManager getMemoryManager() {
949            return memoryManager;
950        }
951    
952    
953        /**
954         * @return Returns the queueManager.
955         */
956        public MemoryBoundedQueueManager getQueueManager() {
957            return queueManager;
958        }
959        
960        
961        public String getName() {
962            return getBrokerName();
963        }
964        public String toString (){
965            return "broker: " + getName();
966        }
967    
968        
969        
970    
971    }