View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.broker.impl;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.codehaus.activemq.broker.Broker;
24  import org.codehaus.activemq.broker.BrokerClient;
25  import org.codehaus.activemq.capacity.DelegateCapacityMonitor;
26  import org.codehaus.activemq.message.ActiveMQMessage;
27  import org.codehaus.activemq.message.ActiveMQXid;
28  import org.codehaus.activemq.message.ConsumerInfo;
29  import org.codehaus.activemq.message.MessageAck;
30  import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
31  import org.codehaus.activemq.service.MessageContainerManager;
32  import org.codehaus.activemq.service.Transaction;
33  import org.codehaus.activemq.service.TransactionManager;
34  import org.codehaus.activemq.service.boundedvm.TransientTopicBoundedMessageManager;
35  import org.codehaus.activemq.service.impl.DurableTopicMessageContainerManager;
36  import org.codehaus.activemq.service.impl.MessageAckTransactionTask;
37  import org.codehaus.activemq.service.impl.QueueMessageContainerManager;
38  import org.codehaus.activemq.service.impl.RedeliverMessageTransactionTask;
39  import org.codehaus.activemq.service.impl.SendMessageTransactionTask;
40  import org.codehaus.activemq.store.PersistenceAdapter;
41  import org.codehaus.activemq.store.PreparedTransactionStore;
42  import org.codehaus.activemq.store.vm.VMPersistenceAdapter;
43  import org.codehaus.activemq.store.vm.VMTransactionManager;
44  import org.codehaus.activemq.util.Callback;
45  import org.codehaus.activemq.util.ExceptionTemplate;
46  import org.codehaus.activemq.util.JMSExceptionHelper;
47  
48  import javax.jms.JMSException;
49  import javax.transaction.xa.XAException;
50  import java.io.File;
51  import java.lang.reflect.InvocationTargetException;
52  import java.lang.reflect.Method;
53  
54  /***
55   * The default {@link Broker} implementation
56   *
57   * @version $Revision: 1.11 $
58   */
59  public class DefaultBroker extends DelegateCapacityMonitor implements Broker {
60  
61      private static final Log log = LogFactory.getLog(DefaultBroker.class);
62  
63      protected static final String PROPERTY_STORE_DIRECTORY = "activemq.store.dir";
64      protected static final String PERSISTENCE_ADAPTER_PROPERTY = "activemq.persistenceAdapter";
65  
66      protected static final Class[] NEWINSTANCE_PARAMETER_TYPES = {File.class};
67  
68      private static final long DEFAULT_MAX_MEMORY_USAGE = 20 * 1024 * 1024; //20mb
69  
70      private PersistenceAdapter persistenceAdapter;
71      private TransactionManager transactionManager;
72      private MessageContainerManager[] containerManagers;
73      private File tempDir;
74      private MemoryBoundedQueueManager memoryManager;
75      private PreparedTransactionStore preparedTransactionStore;
76      private final String brokerName;
77  
78  
79      public DefaultBroker(String brokerName) {
80          this.brokerName = brokerName;
81          memoryManager = new MemoryBoundedQueueManager("Broker Memory Manager", DEFAULT_MAX_MEMORY_USAGE);
82          setDelegate(memoryManager);
83      }
84  
85      public DefaultBroker(String brokerName, PersistenceAdapter persistenceAdapter) {
86          this(brokerName);
87          this.persistenceAdapter = persistenceAdapter;
88      }
89  
90      /***
91       * Start this Service
92       *
93       * @throws JMSException
94       */
95      public void start() throws JMSException {
96          if (persistenceAdapter == null) {
97              persistenceAdapter = createPersistenceAdapter();
98          }
99          persistenceAdapter.start();
100 
101         if (transactionManager == null) {
102             preparedTransactionStore = persistenceAdapter.createPreparedTransactionStore();
103             transactionManager = new VMTransactionManager(this, preparedTransactionStore);
104         }
105         transactionManager.start();
106 
107         // force containers to be created
108         getContainerManagers();
109 
110 
111         for (int i = 0; i < containerManagers.length; i++) {
112             containerManagers[i].start();
113         }
114     }
115 
116 
117     /***
118      * stop this Service
119      *
120      * @throws JMSException
121      */
122 
123     public void stop() throws JMSException {
124         ExceptionTemplate template = new ExceptionTemplate();
125 
126         for (int i = 0; i < containerManagers.length; i++) {
127             final MessageContainerManager containerManager = containerManagers[i];
128             template.run(new Callback() {
129                 public void execute() throws Throwable {
130                     containerManager.stop();
131                 }
132             });
133         }
134         if (transactionManager != null) {
135             template.run(new Callback() {
136                 public void execute() throws Throwable {
137                     transactionManager.stop();
138                 }
139             });
140         }
141 
142         template.run(new Callback() {
143             public void execute() throws Throwable {
144                 persistenceAdapter.stop();
145             }
146         });
147 
148         template.throwJMSException();
149     }
150 
151     /***
152      * Acknowledge consumption of a message by the Message Consumer
153      *
154      * @param client
155      * @param ack
156      * @throws JMSException
157      */
158     public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
159         for (int i = 0; i < containerManagers.length; i++) {
160             containerManagers[i].acknowledgeMessage(client, ack);
161         }
162     }
163 
164     /***
165      * Acknowledge consumption of a message within a transaction
166      *
167      * @param client
168      * @param transactionId
169      * @param ack
170      */
171     public void acknowledgeTransactedMessage(final BrokerClient client, final String transactionId, final MessageAck ack) throws JMSException {
172         Transaction transaction;
173         if (ack.isXaTransacted()) {
174             try {
175                 transaction = transactionManager.getXATransaction(new ActiveMQXid(transactionId));
176             }
177             catch (XAException e) {
178                 throw (JMSException) new JMSException(e.getMessage()).initCause(e);
179             }
180         }
181         else {
182             transaction = transactionManager.getLocalTransaction(transactionId);
183         }
184         transaction.addPostCommitTask(new MessageAckTransactionTask(client, ack));
185         transaction.addPostRollbackTask(new RedeliverMessageTransactionTask(client, ack));
186 
187         // we need to tell the dispatcher that we can now accept another message
188         // even though we don't really ack the message until the commit
189         // this is because if we have a prefetch value of 1, we can never consume 2 messages
190         // in a transaction, since the ack for the first message never arrives until the commit
191         for (int i = 0; i < containerManagers.length; i++) {
192             containerManagers[i].acknowledgeTransactedMessage(client, transactionId, ack);
193         }
194     }
195 
196     /***
197      * send a message to the broker
198      *
199      * @param client
200      * @param message
201      * @throws JMSException
202      */
203     public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
204         checkValid();
205         if (message.getJMSMessageID() == null) {
206             throw new JMSException("No messageID specified for the Message");
207         }
208         for (int i = 0; i < containerManagers.length; i++) {
209             containerManagers[i].sendMessage(client, message);
210         }
211     }
212 
213     /***
214      * send a message to the broker within a transaction
215      *
216      * @param client
217      * @param transactionId
218      * @param message
219      */
220     public void sendTransactedMessage(final BrokerClient client, final String transactionId, final ActiveMQMessage message) throws JMSException {
221         Transaction transaction;
222         if (message.isXaTransacted()) {
223             try {
224                 transaction = transactionManager.getXATransaction(new ActiveMQXid(transactionId));
225             }
226             catch (XAException e) {
227                 throw (JMSException) new JMSException(e.getMessage()).initCause(e);
228             }
229         }
230         else {
231             transaction = transactionManager.getLocalTransaction(transactionId);
232         }
233 
234         transaction.addPostCommitTask(new SendMessageTransactionTask(client, message));
235     }
236 
237     /***
238      * Add an active message consumer
239      *
240      * @param client
241      * @param info
242      * @throws JMSException
243      */
244     public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
245         validateConsumer(info);
246         MessageContainerManager[] array = getContainerManagers();
247         for (int i = 0; i < array.length; i++) {
248             array[i].addMessageConsumer(client, info);
249         }
250     }
251 
252     /***
253      * remove an active message consumer
254      *
255      * @param client
256      * @param info
257      * @throws JMSException
258      */
259     public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
260         validateConsumer(info);
261         for (int i = 0; i < containerManagers.length; i++) {
262             containerManagers[i].removeMessageConsumer(client, info);
263         }
264     }
265 
266     public void redeliverMessage(BrokerClient client, MessageAck ack) throws JMSException {
267         for (int i = 0; i < containerManagers.length; i++) {
268             containerManagers[i].redeliverMessage(client, ack);
269         }
270     }
271 
272     /***
273      * Delete a durable subscriber
274      *
275      * @param clientId
276      * @param subscriberName
277      * @throws JMSException if the subscriber doesn't exist or is still active
278      */
279     public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
280         for (int i = 0; i < containerManagers.length; i++) {
281             containerManagers[i].deleteSubscription(clientId, subscriberName);
282         }
283     }
284 
285 
286     /***
287      * Start a transaction.
288      *
289      * @see org.codehaus.activemq.broker.Broker#startTransaction(org.codehaus.activemq.broker.BrokerClient, java.lang.String)
290      */
291     public void startTransaction(BrokerClient client, String transactionId) throws JMSException {
292         transactionManager.createLocalTransaction(client, transactionId);
293     }
294 
295     /***
296      * commit a transaction
297      *
298      * @param client
299      * @param transactionId
300      * @throws JMSException
301      */
302     public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
303         try {
304             for (int i = 0; i < containerManagers.length; i++) {
305                 containerManagers[i].commitTransaction(client, transactionId);
306             }
307             Transaction transaction = transactionManager.getLocalTransaction(transactionId);
308             transaction.commit(true);
309         }
310         catch (XAException e) {
311             // TODO: I think the XAException should propagate all the way to the client.
312             throw (JMSException) new JMSException(e.getMessage()).initCause(e);
313         }
314     }
315 
316     /***
317      * rollback a transaction
318      *
319      * @param client
320      * @param transactionId
321      */
322     public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException {
323         try {
324             for (int i = 0; i < containerManagers.length; i++) {
325                 containerManagers[i].rollbackTransaction(client, transactionId);
326             }
327             Transaction transaction = transactionManager.getLocalTransaction(transactionId);
328             transaction.rollback();
329         }
330         catch (XAException e) {
331             // TODO: I think the XAException should propagate all the way to the client.
332             throw (JMSException) new JMSException(e.getMessage()).initCause(e);
333         }
334     }
335 
336     /***
337      * Starts an XA Transaction.
338      *
339      * @see org.codehaus.activemq.broker.Broker#startTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
340      */
341     public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
342         transactionManager.createXATransaction(client, xid);
343     }
344 
345     /***
346      * Prepares an XA Transaciton.
347      *
348      * @see org.codehaus.activemq.broker.Broker#prepareTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
349      */
350     public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
351         Transaction transaction = transactionManager.getXATransaction(xid);
352         return transaction.prepare();
353     }
354 
355     /***
356      * Rollback an XA Transaction.
357      *
358      * @see org.codehaus.activemq.broker.Broker#rollbackTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
359      */
360     public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
361         Transaction transaction = transactionManager.getXATransaction(xid);
362         transaction.rollback();
363     }
364 
365     /***
366      * Commit an XA Transaction.
367      *
368      * @see org.codehaus.activemq.broker.Broker#commitTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid, boolean)
369      */
370     public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException {
371         Transaction transaction = transactionManager.getXATransaction(xid);
372         transaction.commit(onePhase);
373     }
374 
375     /***
376      * A hint to the broker that an BrokerClient has stopped
377      * This enables the broker to clean-up any outstanding processing
378      * that may be outstanding
379      *
380      * @param client
381      */
382     public void cleanUpClient(BrokerClient client) throws JMSException {
383         if (transactionManager != null) {
384             transactionManager.cleanUpClient(client);
385         }
386     }
387 
388     /***
389      * Gets the prepared XA transactions.
390      *
391      * @see org.codehaus.activemq.broker.Broker#getPreparedTransactions(org.codehaus.activemq.broker.BrokerClient)
392      */
393     public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException {
394         return transactionManager.getPreparedXATransactions();
395     }
396 
397     // Properties
398     //-------------------------------------------------------------------------
399 
400     /***
401      * Get a temp directory - used for spooling
402      *
403      * @return a File ptr to the directory
404      */
405     public File getTempDir() {
406         if (tempDir == null) {
407             String dirName = System.getProperty("activemq.store.tempdir", "ActiveMQTemp");
408             tempDir = new File(dirName);
409         }
410         return tempDir;
411     }
412 
413     public String getBrokerName() {
414         return brokerName;
415     }
416 
417     public void setTempDir(File tempDir) {
418         this.tempDir = tempDir;
419     }
420 
421     public MessageContainerManager[] getContainerManagers() {
422         if (containerManagers == null) {
423             containerManagers = createContainerManagers();
424         }
425         return containerManagers;
426     }
427 
428     public void setContainerManagers(MessageContainerManager[] containerManagers) {
429         this.containerManagers = containerManagers;
430     }
431 
432     public PersistenceAdapter getPersistenceAdapter() {
433         return persistenceAdapter;
434     }
435 
436     public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {
437         this.persistenceAdapter = persistenceAdapter;
438     }
439 
440     public TransactionManager getTransactionManager() {
441         return transactionManager;
442     }
443 
444     public void setTransactionManager(TransactionManager transactionManager) {
445         this.transactionManager = transactionManager;
446     }
447 
448     public PreparedTransactionStore getPreparedTransactionStore() {
449         return preparedTransactionStore;
450     }
451 
452     public void setPreparedTransactionStore(PreparedTransactionStore preparedTransactionStore) {
453         this.preparedTransactionStore = preparedTransactionStore;
454     }
455 
456     /***
457      * @return Returns the maximumMemoryUsage.
458      */
459     public long getMaximumMemoryUsage() {
460         return memoryManager.getValueLimit();
461     }
462 
463     /***
464      * @param maximumMemoryUsage The maximumMemoryUsage to set.
465      */
466     public void setMaximumMemoryUsage(long maximumMemoryUsage) {
467         this.memoryManager.setValueLimit(maximumMemoryUsage);
468     }
469 
470     
471     // Implementation methods
472     //-------------------------------------------------------------------------
473 
474 
475     /***
476      * Factory method to create a default persistence adapter
477      *
478      * @return
479      */
480     protected PersistenceAdapter createPersistenceAdapter() throws JMSException {
481         File directory = new File(getStoreDirectory());
482 
483         // lets use reflection to avoid runtime dependency on persistence libraries
484         PersistenceAdapter answer = null;
485         String property = System.getProperty(PERSISTENCE_ADAPTER_PROPERTY);
486         if (property != null) {
487             answer = tryCreatePersistenceAdapter(property, directory, false);
488         }
489         if (answer == null) {
490             answer = tryCreatePersistenceAdapter("org.codehaus.activemq.store.jdbm.JdbmPersistenceAdapter", directory, true);
491         }
492         if (answer == null) {
493             answer = tryCreatePersistenceAdapter("org.codehaus.activemq.store.bdb.BDbPersistenceAdapter", directory, true);
494         }
495         if (answer != null) {
496             return answer;
497         }
498         else {
499             log.warn("Neither JDBM or BDB on the classpath or property '" + PERSISTENCE_ADAPTER_PROPERTY
500                     + "' not specified so defaulting to use RAM based message persistence");
501             return new VMPersistenceAdapter();
502         }
503     }
504 
505     protected PersistenceAdapter tryCreatePersistenceAdapter(String className, File directory, boolean ignoreErrors) throws JMSException {
506         Class adapterClass = loadClass(className, ignoreErrors);
507         if (adapterClass != null) {
508 
509             try {
510                 Method method = adapterClass.getMethod("newInstance", NEWINSTANCE_PARAMETER_TYPES);
511                 PersistenceAdapter answer = (PersistenceAdapter) method.invoke(null, new Object[]{directory});
512                 log.info("Using persistence adapter: " + adapterClass.getName());
513                 return answer;
514             }
515             catch (InvocationTargetException e) {
516                 Throwable cause = e.getTargetException();
517                 if (cause != null) {
518                     if (cause instanceof JMSException) {
519                         throw (JMSException) cause;
520                     }
521                     else {
522                         if (cause instanceof Exception) {
523                             throw createInstantiateAdapterException(adapterClass, (Exception) cause);
524                         }
525                     }
526                 }
527                 if (!ignoreErrors) {
528                     throw createInstantiateAdapterException(adapterClass, e);
529                 }
530             }
531             catch (Throwable e) {
532                 if (!ignoreErrors) {
533                     throw createInstantiateAdapterException(adapterClass, e);
534                 }
535             }
536         }
537         return null;
538     }
539 
540     protected JMSException createInstantiateAdapterException(Class adapterClass, Throwable e) {
541         return JMSExceptionHelper.newJMSException("Could not instantiate instance of "
542                 + adapterClass.getName() + ". Reason: " + e, e);
543     }
544 
545     /***
546      * Tries to load the given class from the current context class loader or
547      * class loader which loaded us or return null if the class could not be found
548      */
549     protected Class loadClass(String name, boolean ignoreErrors) throws JMSException {
550         try {
551             return Thread.currentThread().getContextClassLoader().loadClass(name);
552         }
553         catch (ClassNotFoundException e) {
554             try {
555                 return getClass().getClassLoader().loadClass(name);
556             }
557             catch (ClassNotFoundException e2) {
558                 if (ignoreErrors) {
559                     log.trace("Could not find class: " + name + " on the classpath");
560                     return null;
561                 }
562                 else {
563                     throw JMSExceptionHelper.newJMSException("Could not find class: " + name + " on the classpath. Reason: " + e, e);
564                 }
565             }
566         }
567     }
568 
569     protected String getStoreDirectory() {
570         return System.getProperty(PROPERTY_STORE_DIRECTORY, "ActiveMQ");
571     }
572 
573     /***
574      * Factory method to create the default container managers
575      *
576      * @return
577      */
578     protected MessageContainerManager[] createContainerManagers() {
579         MessageContainerManager[] answer = {
580             //new TransientTopicMessageContainerManager(persistenceAdapter),
581             new TransientTopicBoundedMessageManager(memoryManager),
582             new DurableTopicMessageContainerManager(persistenceAdapter),
583             new QueueMessageContainerManager(persistenceAdapter),
584         };
585         return answer;
586     }
587 
588     /***
589      * Ensures the consumer is valid, throwing a meaningful exception if not
590      *
591      * @param info
592      * @throws JMSException
593      */
594     protected void validateConsumer(ConsumerInfo info) throws JMSException {
595         if (info.getConsumerId() == null) {
596             throw new JMSException("No consumerId specified for the ConsumerInfo");
597         }
598     }
599 
600     protected void checkValid() throws JMSException {
601         if (containerManagers == null) {
602             throw new JMSException("This Broker has not yet been started. Ensure start() is called before invoking action methods");
603         }
604     }
605 }