001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.broker.impl; 020 021 import java.io.File; 022 import java.io.IOException; 023 import java.util.ArrayList; 024 import java.util.Hashtable; 025 import java.util.Iterator; 026 import java.util.Map; 027 import javax.jms.JMSException; 028 import javax.naming.Context; 029 import javax.transaction.xa.XAException; 030 import org.activemq.broker.Broker; 031 import org.activemq.broker.BrokerAdmin; 032 import org.activemq.broker.BrokerClient; 033 import org.activemq.broker.ConsumerInfoListener; 034 import org.activemq.capacity.DelegateCapacityMonitor; 035 import org.activemq.io.util.MemoryBoundedObjectManager; 036 import org.activemq.io.util.MemoryBoundedQueueManager; 037 import org.activemq.jndi.ReadOnlyContext; 038 import org.activemq.message.ActiveMQDestination; 039 import org.activemq.message.ActiveMQMessage; 040 import org.activemq.message.ActiveMQXid; 041 import org.activemq.message.ConnectionInfo; 042 import org.activemq.message.ConsumerInfo; 043 import org.activemq.message.MessageAck; 044 import org.activemq.message.ProducerInfo; 045 import org.activemq.security.SecurityAdapter; 046 import org.activemq.service.DeadLetterPolicy; 047 import org.activemq.service.MessageContainerAdmin; 048 import org.activemq.service.MessageContainerManager; 049 import org.activemq.service.RedeliveryPolicy; 050 import org.activemq.service.Transaction; 051 import org.activemq.service.TransactionManager; 052 import org.activemq.service.boundedvm.DurableQueueBoundedMessageManager; 053 import org.activemq.service.boundedvm.TransientQueueBoundedMessageManager; 054 import org.activemq.service.boundedvm.TransientTopicBoundedMessageManager; 055 import org.activemq.service.impl.DurableTopicMessageContainerManager; 056 import org.activemq.store.PersistenceAdapter; 057 import org.activemq.store.PersistenceAdapterFactory; 058 import org.activemq.store.TransactionStore; 059 import org.activemq.store.vm.VMPersistenceAdapter; 060 import org.activemq.store.vm.VMTransactionManager; 061 import org.activemq.util.Callback; 062 import org.activemq.util.ExceptionTemplate; 063 import org.activemq.util.JMSExceptionHelper; 064 import org.apache.commons.logging.Log; 065 import org.apache.commons.logging.LogFactory; 066 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 067 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 068 069 /** 070 * The default {@link Broker} implementation 071 * 072 * @version $Revision: 1.1.1.1 $ 073 */ 074 public class DefaultBroker extends DelegateCapacityMonitor implements Broker, BrokerAdmin { 075 076 private static final Log log = LogFactory.getLog(DefaultBroker.class); 077 078 protected static final String PROPERTY_STORE_DIRECTORY = "activemq.store.dir"; 079 protected static final String PERSISTENCE_ADAPTER_FACTORY = "activemq.persistenceAdapterFactory"; 080 081 protected static final Class[] NEWINSTANCE_PARAMETER_TYPES = {File.class}; 082 083 private static final long DEFAULT_MAX_MEMORY_USAGE = 20 * 1024 * 1024; //20mb 084 085 private PersistenceAdapter persistenceAdapter; 086 private TransactionManager transactionManager; 087 private MessageContainerManager[] containerManagers; 088 private File tempDir; 089 private MemoryBoundedObjectManager memoryManager; 090 private MemoryBoundedQueueManager queueManager; 091 private TransactionStore preparedTransactionStore; 092 private final String brokerName; 093 private final String brokerClusterName; 094 private Map containerManagerMap; 095 private CopyOnWriteArrayList consumerInfoListeners; 096 private MessageContainerManager persistentTopicMCM; 097 private MessageContainerManager transientTopicMCM; 098 private MessageContainerManager transientQueueMCM; 099 private DurableQueueBoundedMessageManager persistentQueueMCM; 100 private SecurityAdapter securityAdapter; 101 private RedeliveryPolicy redeliveryPolicy; 102 private DeadLetterPolicy deadLetterPolicy; 103 private AdvisorySupport advisory; 104 private Map messageConsumers = new ConcurrentHashMap(); 105 106 107 108 public DefaultBroker(String brokerName, String brokerClusterName, MemoryBoundedObjectManager memoryManager) { 109 this.brokerName = brokerName; 110 this.brokerClusterName = brokerClusterName; 111 this.memoryManager = memoryManager; 112 queueManager = new MemoryBoundedQueueManager(memoryManager); 113 setDelegate(memoryManager); 114 containerManagerMap = new ConcurrentHashMap(); 115 consumerInfoListeners = new CopyOnWriteArrayList(); 116 this.advisory = new AdvisorySupport(this); 117 } 118 119 public DefaultBroker(String brokerName, MemoryBoundedObjectManager memoryManager) { 120 this(brokerName, "default", memoryManager); 121 } 122 123 public DefaultBroker(String brokerName, String cluserName) { 124 this(brokerName, cluserName, new MemoryBoundedObjectManager("Broker Memory Manager", DEFAULT_MAX_MEMORY_USAGE)); 125 } 126 127 public DefaultBroker(String brokerName) { 128 this(brokerName, new MemoryBoundedObjectManager("Broker Memory Manager", DEFAULT_MAX_MEMORY_USAGE)); 129 } 130 131 public DefaultBroker(String brokerName, String brokerClusterName, PersistenceAdapter persistenceAdapter) { 132 this(brokerName, brokerClusterName, new MemoryBoundedObjectManager("Broker Memory Manager", DEFAULT_MAX_MEMORY_USAGE)); 133 this.persistenceAdapter = persistenceAdapter; 134 } 135 136 public DefaultBroker(String brokerName, PersistenceAdapter persistenceAdapter) { 137 this(brokerName); 138 this.persistenceAdapter = persistenceAdapter; 139 } 140 141 /** 142 * Start this Service 143 * 144 * @throws JMSException 145 */ 146 public void start() throws JMSException { 147 if (redeliveryPolicy == null) { 148 redeliveryPolicy = new RedeliveryPolicy(); 149 } 150 if (deadLetterPolicy == null){ 151 deadLetterPolicy = new DeadLetterPolicy(this); 152 } 153 if (persistenceAdapter == null) { 154 persistenceAdapter = createPersistenceAdapter(); 155 } 156 persistenceAdapter.start(); 157 158 if (transactionManager == null) { 159 preparedTransactionStore = persistenceAdapter.createTransactionStore(); 160 transactionManager = new VMTransactionManager(this, preparedTransactionStore); 161 } 162 163 // force containers to be created 164 if (containerManagerMap.isEmpty()) { 165 makeDefaultContainerManagers(); 166 } 167 getContainerManagers(); 168 169 for (int i = 0; i < containerManagers.length; i++) { 170 containerManagers[i].setDeadLetterPolicy(this.deadLetterPolicy); 171 containerManagers[i].start(); 172 } 173 transactionManager.start(); 174 } 175 176 177 /** 178 * stop this Service 179 * 180 * @throws JMSException 181 */ 182 183 public void stop() throws JMSException { 184 ExceptionTemplate template = new ExceptionTemplate(); 185 186 if (containerManagers != null) { 187 for (int i = 0; i < containerManagers.length; i++) { 188 final MessageContainerManager containerManager = containerManagers[i]; 189 template.run(new Callback() { 190 public void execute() throws Throwable { 191 containerManager.stop(); 192 } 193 }); 194 } 195 } 196 if (transactionManager != null) { 197 template.run(new Callback() { 198 public void execute() throws Throwable { 199 transactionManager.stop(); 200 } 201 }); 202 } 203 204 template.run(new Callback() { 205 public void execute() throws Throwable { 206 persistenceAdapter.stop(); 207 } 208 }); 209 210 template.throwJMSException(); 211 } 212 213 // Broker interface 214 //------------------------------------------------------------------------- 215 216 public void addClient(BrokerClient client, ConnectionInfo info) throws JMSException { 217 if (securityAdapter != null) { 218 securityAdapter.authorizeConnection(client, info); 219 } 220 advisory.addConnection(client,info); 221 } 222 223 public void removeClient(BrokerClient client, ConnectionInfo info) throws JMSException { 224 if (transactionManager != null) { 225 transactionManager.cleanUpClient(client); 226 } 227 advisory.removeConnection(client,info); 228 } 229 230 public void addMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException { 231 if (securityAdapter != null) { 232 securityAdapter.authorizeProducer(client, info); 233 } 234 advisory.addProducer(client,info); 235 } 236 237 public void removeMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException { 238 advisory.removeProducer(client,info); 239 } 240 241 /** 242 * Add an active message consumer 243 */ 244 public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 245 validateConsumer(info); 246 if (securityAdapter != null) { 247 securityAdapter.authorizeConsumer(client, info); 248 } 249 advisory.addAdvisory(client, info); 250 MessageContainerManager[] array = getContainerManagers(); 251 for (int i = 0;i < array.length;i++) { 252 array[i].addMessageConsumer(client, info); 253 } 254 fireConsumerInfo(client, info); 255 messageConsumers.put(info,client); 256 } 257 258 /** 259 * remove an active message consumer 260 */ 261 public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 262 validateConsumer(info); 263 advisory.removeAdvisory(client, info); 264 for (int i = 0;i < containerManagers.length;i++) { 265 containerManagers[i].removeMessageConsumer(client, info); 266 } 267 fireConsumerInfo(client, info); 268 messageConsumers.remove(info); 269 } 270 271 272 /** 273 * send a message to the broker 274 */ 275 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException { 276 checkValid(); 277 ActiveMQDestination destination = message.getJMSActiveMQDestination(); 278 if (destination == null) { 279 throw new JMSException("No destination specified for the Message"); 280 } 281 if (message.getJMSMessageID() == null && !destination.isAdvisory()) { 282 throw new JMSException("No messageID specified for the Message"); 283 } 284 associateTransaction(message); 285 try { 286 if (destination.isComposite()) { 287 boolean first = true; 288 for (Iterator iter = destination.getChildDestinations().iterator();iter.hasNext();) { 289 ActiveMQDestination childDestination = (ActiveMQDestination) iter.next(); 290 // lets shallow copy just in case 291 if (first) { 292 first = false; 293 } 294 else { 295 message = message.shallowCopy(); 296 } 297 message.setJMSDestination(childDestination); 298 doMessageSend(client, message); 299 } 300 } 301 else { 302 if (destination.isTempDestinationAdvisory() && !client.isBrokerConnection()) { 303 advisory.processTempDestinationAdvisory(client,message); 304 } 305 doMessageSend(client, message); 306 } 307 } 308 finally { 309 disAssociateTransaction(); 310 } 311 } 312 313 /** 314 * Acknowledge consumption of a message by the Message Consumer 315 */ 316 public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException { 317 318 associateTransaction(ack); 319 try { 320 for (int i = 0; i < containerManagers.length; i++) { 321 containerManagers[i].acknowledgeMessage(client, ack); 322 } 323 } finally { 324 disAssociateTransaction(); 325 } 326 327 } 328 329 public void deleteSubscription(String clientId, String subscriberName) throws JMSException { 330 for (int i = 0; i < containerManagers.length; i++) { 331 containerManagers[i].deleteSubscription(clientId, subscriberName); 332 } 333 } 334 335 336 /** 337 * Start a transaction. 338 * 339 * @see org.activemq.broker.Broker#startTransaction(org.activemq.broker.BrokerClient, java.lang.String) 340 */ 341 public void startTransaction(BrokerClient client, String transactionId) throws JMSException { 342 transactionManager.createLocalTransaction(client, transactionId); 343 } 344 345 public void commitTransaction(BrokerClient client, String transactionId) throws JMSException { 346 try { 347 Transaction transaction = transactionManager.getLocalTransaction(transactionId); 348 transaction.commit(true); 349 } 350 catch (XAException e) { 351 // TODO: I think the XAException should propagate all the way to the client. 352 throw (JMSException) new JMSException(e.getMessage()).initCause(e); 353 } 354 } 355 356 /** 357 * rollback a transaction 358 */ 359 public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException { 360 try { 361 Transaction transaction = transactionManager.getLocalTransaction(transactionId); 362 transaction.rollback(); 363 } 364 catch (XAException e) { 365 // TODO: I think the XAException should propagate all the way to the client. 366 throw (JMSException) new JMSException(e.getMessage()).initCause(e); 367 } 368 } 369 370 /** 371 * Starts an XA Transaction. 372 * 373 * @see org.activemq.broker.Broker#startTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid) 374 */ 375 public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException { 376 transactionManager.createXATransaction(client, xid); 377 } 378 379 /** 380 * Prepares an XA Transaciton. 381 * 382 * @see org.activemq.broker.Broker#prepareTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid) 383 */ 384 public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException { 385 Transaction transaction = transactionManager.getXATransaction(xid); 386 return transaction.prepare(); 387 } 388 389 /** 390 * Rollback an XA Transaction. 391 * 392 * @see org.activemq.broker.Broker#rollbackTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid) 393 */ 394 public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException { 395 Transaction transaction = transactionManager.getXATransaction(xid); 396 transaction.rollback(); 397 } 398 399 /** 400 * Commit an XA Transaction. 401 * 402 * @see org.activemq.broker.Broker#commitTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid, boolean) 403 */ 404 public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException { 405 Transaction transaction = transactionManager.getXATransaction(xid); 406 transaction.commit(onePhase); 407 } 408 409 /** 410 * Gets the prepared XA transactions. 411 * 412 * @see org.activemq.broker.Broker#getPreparedTransactions(org.activemq.broker.BrokerClient) 413 */ 414 public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException { 415 return transactionManager.getPreparedXATransactions(); 416 } 417 418 419 420 421 // Properties 422 //------------------------------------------------------------------------- 423 424 /** 425 * Get a temp directory - used for spooling 426 * 427 * @return a File ptr to the directory 428 */ 429 public File getTempDir() { 430 if (tempDir == null) { 431 String dirName = System.getProperty("activemq.store.tempdir", "ActiveMQTemp"); 432 tempDir = new File(dirName); 433 } 434 return tempDir; 435 } 436 437 public String getBrokerName() { 438 return brokerName; 439 } 440 441 /** 442 * @return Returns the brokerClusterName. 443 */ 444 public String getBrokerClusterName() { 445 return brokerClusterName; 446 } 447 448 public void setTempDir(File tempDir) { 449 this.tempDir = tempDir; 450 } 451 452 public MessageContainerManager[] getContainerManagers() { 453 if (containerManagers == null) { 454 containerManagers = createContainerManagers(); 455 } 456 return containerManagers; 457 } 458 459 public Map getContainerManagerMap() { 460 return containerManagerMap; 461 } 462 463 public void setContainerManagerMap(Map containerManagerMap) { 464 this.containerManagerMap = containerManagerMap; 465 this.containerManagers = null; 466 } 467 468 public PersistenceAdapter getPersistenceAdapter() { 469 return persistenceAdapter; 470 } 471 472 public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) { 473 this.persistenceAdapter = persistenceAdapter; 474 } 475 476 public TransactionManager getTransactionManager() { 477 return transactionManager; 478 } 479 480 public void setTransactionManager(TransactionManager transactionManager) { 481 this.transactionManager = transactionManager; 482 } 483 484 public SecurityAdapter getSecurityAdapter() { 485 return securityAdapter; 486 } 487 488 public void setSecurityAdapter(SecurityAdapter securityAdapter) { 489 this.securityAdapter = securityAdapter; 490 } 491 492 public RedeliveryPolicy getRedeliveryPolicy() { 493 return redeliveryPolicy; 494 } 495 496 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 497 this.redeliveryPolicy = redeliveryPolicy; 498 } 499 500 public TransactionStore getPreparedTransactionStore() { 501 return preparedTransactionStore; 502 } 503 504 public void setPreparedTransactionStore(TransactionStore preparedTransactionStore) { 505 this.preparedTransactionStore = preparedTransactionStore; 506 } 507 508 /** 509 * @return the DeadLetterPolicy 510 */ 511 public DeadLetterPolicy getDeadLetterPolicy(){ 512 return deadLetterPolicy; 513 } 514 515 /** 516 * set the dead letter policy 517 * @param deadLetterPolicy 518 */ 519 public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy){ 520 this.deadLetterPolicy = deadLetterPolicy; 521 } 522 523 /** 524 * @return Returns the maximumMemoryUsage. 525 */ 526 public long getMaximumMemoryUsage() { 527 return memoryManager.getValueLimit(); 528 } 529 530 /** 531 * @param maximumMemoryUsage The maximumMemoryUsage to set. 532 */ 533 public void setMaximumMemoryUsage(long maximumMemoryUsage) { 534 this.memoryManager.setValueLimit(maximumMemoryUsage); 535 } 536 537 538 public Context getDestinationContext(Hashtable environment) { 539 Map data = new ConcurrentHashMap(); 540 for (Iterator iter = containerManagerMap.entrySet().iterator(); iter.hasNext();) { 541 Map.Entry entry = (Map.Entry) iter.next(); 542 String name = entry.getKey().toString(); 543 MessageContainerManager manager = (MessageContainerManager) entry.getValue(); 544 Context context = new ReadOnlyContext(environment, manager.getDestinations()); 545 data.put(name, context); 546 } 547 return new ReadOnlyContext(environment, data); 548 } 549 550 // Implementation methods 551 //------------------------------------------------------------------------- 552 553 554 protected void doMessageSend(BrokerClient client, ActiveMQMessage message) throws JMSException { 555 if (securityAdapter != null) { 556 securityAdapter.authorizeSendMessage(client, message); 557 } 558 ActiveMQDestination dest = message.getJMSActiveMQDestination(); 559 if (dest.isTopic()){ 560 if (message.isPersistent() && !dest.isTemporary()){ 561 persistentTopicMCM.sendMessage(client,message); 562 } 563 transientTopicMCM.sendMessage(client, message); 564 }else { 565 transientQueueMCM.sendMessage(client, message); 566 persistentQueueMCM.sendMessage(client, message); 567 } 568 } 569 570 /** 571 * Factory method to create a default persistence adapter 572 * 573 * @return 574 */ 575 protected PersistenceAdapter createPersistenceAdapter() throws JMSException { 576 File directory = new File(getStoreDirectory()); 577 578 // lets use reflection to avoid runtime dependency on persistence libraries 579 PersistenceAdapter answer = null; 580 String property = System.getProperty(PERSISTENCE_ADAPTER_FACTORY); 581 if (property != null) { 582 answer = tryCreatePersistenceAdapter(property, directory, false); 583 } 584 if (answer == null) { 585 answer = tryCreatePersistenceAdapter("org.activemq.broker.impl.DefaultPersistenceAdapterFactory", directory, true); 586 } 587 if (answer != null) { 588 return answer; 589 } 590 else { 591 log.warn("Default message store (journal+derby) could not be found in the classpath or property '" + PERSISTENCE_ADAPTER_FACTORY 592 + "' not specified so defaulting to use RAM based message persistence"); 593 return new VMPersistenceAdapter(); 594 } 595 } 596 597 protected PersistenceAdapter tryCreatePersistenceAdapter(String className, File directory, boolean ignoreErrors) throws JMSException { 598 Class adapterClass = loadClass(className, ignoreErrors); 599 if (adapterClass != null) { 600 try { 601 PersistenceAdapterFactory factory = (PersistenceAdapterFactory) adapterClass.newInstance(); 602 PersistenceAdapter answer = factory.createPersistenceAdapter(directory, memoryManager); 603 log.info("Persistence adapter created using: " + className); 604 return answer; 605 } 606 catch (IOException cause) { 607 throw createInstantiateAdapterException(className, (Exception) cause); 608 } 609 catch (Throwable e) { 610 if (!ignoreErrors) { 611 throw createInstantiateAdapterException(className, e); 612 } 613 } 614 } 615 return null; 616 } 617 618 protected JMSException createInstantiateAdapterException(String className, Throwable e) { 619 return JMSExceptionHelper.newJMSException("Persistence adapter could not be created using: " 620 + className + ". Reason: " + e, e); 621 } 622 623 /** 624 * Tries to load the given class from the current context class loader or 625 * class loader which loaded us or return null if the class could not be found 626 */ 627 protected Class loadClass(String name, boolean ignoreErrors) throws JMSException { 628 try { 629 return Thread.currentThread().getContextClassLoader().loadClass(name); 630 } 631 catch (ClassNotFoundException e) { 632 try { 633 return getClass().getClassLoader().loadClass(name); 634 } 635 catch (ClassNotFoundException e2) { 636 if (ignoreErrors) { 637 log.trace("Could not find class: " + name + " on the classpath"); 638 return null; 639 } 640 else { 641 throw JMSExceptionHelper.newJMSException("Could not find class: " + name + " on the classpath. Reason: " + e, e); 642 } 643 } 644 } 645 } 646 647 protected String getStoreDirectory() { 648 String defaultDirectory = "ActiveMQ" + File.separator + sanitizeString(brokerName); 649 return System.getProperty(PROPERTY_STORE_DIRECTORY, defaultDirectory); 650 } 651 652 /** 653 * Factory method to create the default container managers 654 * 655 * @return 656 */ 657 protected MessageContainerManager[] createContainerManagers() { 658 int size = containerManagerMap.size(); 659 MessageContainerManager[] answer = new MessageContainerManager[size]; 660 containerManagerMap.values().toArray(answer); 661 return answer; 662 } 663 664 protected void makeDefaultContainerManagers() { 665 transientTopicMCM = new TransientTopicBoundedMessageManager(queueManager); 666 containerManagerMap.put("transientTopicContainer", transientTopicMCM); 667 persistentTopicMCM = new DurableTopicMessageContainerManager(persistenceAdapter, redeliveryPolicy, deadLetterPolicy); 668 containerManagerMap.put("persistentTopicContainer", persistentTopicMCM); 669 persistentQueueMCM = new DurableQueueBoundedMessageManager(persistenceAdapter, queueManager, redeliveryPolicy, deadLetterPolicy); 670 containerManagerMap.put("persistentQueueContainer", persistentQueueMCM); 671 transientQueueMCM = new TransientQueueBoundedMessageManager(queueManager,redeliveryPolicy, deadLetterPolicy); 672 containerManagerMap.put("transientQueueContainer", transientQueueMCM); 673 } 674 675 /** 676 * Ensures the consumer is valid, throwing a meaningful exception if not 677 * 678 * @param info 679 * @throws JMSException 680 */ 681 protected void validateConsumer(ConsumerInfo info) throws JMSException { 682 if (info.getConsumerId() == null) { 683 throw new JMSException("No consumerId specified for the ConsumerInfo"); 684 } 685 } 686 687 protected void checkValid() throws JMSException { 688 if (containerManagers == null) { 689 throw new JMSException("This Broker has not yet been started. Ensure start() is called before invoking action methods"); 690 } 691 } 692 693 /** 694 * Add a ConsumerInfoListener to the Broker 695 * 696 * @param l 697 */ 698 public void addConsumerInfoListener(ConsumerInfoListener l) { 699 if (l != null){ 700 consumerInfoListeners.add(l); 701 //fire any existing infos to the listener 702 for (Iterator i = messageConsumers.entrySet().iterator(); i.hasNext();){ 703 Map.Entry entry = (Map.Entry)i.next(); 704 ConsumerInfo info = (ConsumerInfo) entry.getKey(); 705 BrokerClient client = (BrokerClient) entry.getValue(); 706 l.onConsumerInfo(client, info); 707 } 708 } 709 } 710 711 /** 712 * Remove a ConsumerInfoListener from the Broker 713 * 714 * @param l 715 */ 716 public void removeConsumerInfoListener(ConsumerInfoListener l) { 717 consumerInfoListeners.remove(l); 718 } 719 720 protected void fireConsumerInfo(BrokerClient client, ConsumerInfo info) { 721 for (Iterator i = consumerInfoListeners.iterator(); i.hasNext();) { 722 ConsumerInfoListener l = (ConsumerInfoListener) i.next(); 723 l.onConsumerInfo(client, info); 724 } 725 } 726 727 /** 728 * @return the MessageContainerManager for durable topics 729 */ 730 public MessageContainerManager getPersistentTopicContainerManager() { 731 return persistentTopicMCM; 732 } 733 734 /** 735 * @return the MessageContainerManager for transient topics 736 */ 737 public MessageContainerManager getTransientTopicContainerManager() { 738 return transientTopicMCM; 739 } 740 741 /** 742 * @return the MessageContainerManager for persistent queues 743 */ 744 public MessageContainerManager getPersistentQueueContainerManager() { 745 return persistentQueueMCM; 746 } 747 748 /** 749 * @return the MessageContainerManager for transient queues 750 */ 751 public MessageContainerManager getTransientQueueContainerManager() { 752 return transientQueueMCM; 753 } 754 755 /** 756 * @see org.activemq.broker.Broker#getBrokerAdmin() 757 */ 758 public BrokerAdmin getBrokerAdmin() { 759 return this; 760 } 761 762 public void createMessageContainer(ActiveMQDestination dest) throws JMSException { 763 for (int i = 0; i < containerManagers.length; i++) { 764 containerManagers[i].createMessageContainer(dest); 765 } 766 } 767 768 public void destoryMessageContainer(ActiveMQDestination dest) throws JMSException { 769 for (int i = 0; i < containerManagers.length; i++) { 770 containerManagers[i].destroyMessageContainer(dest); 771 } 772 } 773 774 public MessageContainerAdmin getMessageContainerAdmin(ActiveMQDestination dest) throws JMSException { 775 for (int i = 0; i < containerManagers.length; i++) { 776 Map messageContainerAdmins = containerManagers[i].getMessageContainerAdmins(); 777 MessageContainerAdmin mca = (MessageContainerAdmin) messageContainerAdmins.get(dest); 778 if( mca != null ) { 779 return mca; 780 } 781 } 782 return null; 783 } 784 785 /** 786 * @throws JMSException 787 * @see org.activemq.broker.BrokerAdmin#listDestinations() 788 */ 789 public MessageContainerAdmin[] listMessageContainerAdmin() throws JMSException { 790 791 ArrayList l = new ArrayList(); 792 for (int i = 0; i < containerManagers.length; i++) { 793 Map messageContainerAdmins = containerManagers[i].getMessageContainerAdmins(); 794 for (Iterator iter = messageContainerAdmins.values().iterator(); iter.hasNext();) { 795 MessageContainerAdmin mca = (MessageContainerAdmin) iter.next(); 796 l.add(mca); 797 } 798 } 799 800 MessageContainerAdmin answer[] = new MessageContainerAdmin[l.size()]; 801 l.toArray(answer); 802 return answer; 803 } 804 805 806 /** 807 * Add a message to a dead letter queue 808 * @param deadLetterName 809 * @param message 810 * @throws JMSException 811 */ 812 public void sendToDeadLetterQueue(String deadLetterName,ActiveMQMessage message) throws JMSException{ 813 if (persistentQueueMCM != null){ 814 persistentQueueMCM.sendToDeadLetterQueue(deadLetterName, message); 815 log.debug(message + " sent to DLQ: " + deadLetterName); 816 } 817 } 818 819 /** 820 * send a message to the broker within a transaction 821 public void sendTransactedMessage(final BrokerClient client, final String transactionId, final ActiveMQMessage message) throws JMSException { 822 getTransactionFor(message).addPostCommitTask(new SendMessageTransactionTask(client, message)); 823 } 824 */ 825 826 /** 827 * Acknowledge consumption of a message within a transaction 828 public void acknowledgeTransactedMessage(final BrokerClient client, final String transactionId, final MessageAck ack) throws JMSException { 829 Transaction transaction; 830 if (ack.isXaTransacted()) { 831 try { 832 transaction = transactionManager.getXATransaction(new ActiveMQXid(transactionId)); 833 } 834 catch (XAException e) { 835 throw (JMSException) new JMSException(e.getMessage()).initCause(e); 836 } 837 } 838 else { 839 transaction = transactionManager.getLocalTransaction(transactionId); 840 } 841 transaction.addPostCommitTask(new MessageAckTransactionTask(client, ack)); 842 transaction.addPostRollbackTask(new RedeliverMessageTransactionTask(client, ack)); 843 844 // we need to tell the dispatcher that we can now accept another message 845 // even though we don't really ack the message until the commit 846 // this is because if we have a prefetch value of 1, we can never consume 2 messages 847 // in a transaction, since the ack for the first message never arrives until the commit 848 for (int i = 0; i < containerManagers.length; i++) { 849 containerManagers[i].acknowledgeTransactedMessage(client, transactionId, ack); 850 } 851 } 852 */ 853 854 855 /** 856 * @param message 857 * @return 858 * @throws JMSException 859 private Transaction getTransactionFor(ActiveMQMessage message) throws JMSException { 860 String transactionId = message.getTransactionId(); 861 if (message.isXaTransacted()) { 862 try { 863 return transactionManager.getXATransaction(new ActiveMQXid(transactionId)); 864 } 865 catch (XAException e) { 866 throw (JMSException) new JMSException(e.getMessage()).initCause(e); 867 } 868 } 869 return transactionManager.getLocalTransaction(transactionId); 870 } 871 872 873 public void acknowledgeMessageRecover(MessageAck ack) { 874 } 875 public void sendMessageRecover(ActiveMQMessage message) throws JMSException { 876 } 877 */ 878 879 /** 880 * Associates a Transaction with the current thread. Once this call is finished, 881 * the Transactio ncan be obtained via TransactionManager.getContexTransaction(). 882 * @param message 883 * @throws JMSException 884 */ 885 private final void associateTransaction(ActiveMQMessage message) throws JMSException { 886 Transaction transaction; 887 if( message.isPartOfTransaction() ) { 888 if (message.isXaTransacted()) { 889 try { 890 transaction = transactionManager.getXATransaction((ActiveMQXid) message.getTransactionId()); 891 } 892 catch (XAException e) { 893 throw (JMSException) new JMSException(e.getMessage()).initCause(e); 894 } 895 } else { 896 transaction = transactionManager.getLocalTransaction((String) message.getTransactionId()); 897 } 898 899 } else { 900 transaction = null; 901 } 902 TransactionManager.setContexTransaction(transaction); 903 } 904 905 private void disAssociateTransaction() { 906 TransactionManager.setContexTransaction(null); 907 } 908 909 /** 910 * Associates a Transaction with the current thread. Once this call is finished, 911 * the Transactio ncan be obtained via TransactionManager.getContexTransaction(). 912 * @param ack 913 * @throws JMSException 914 */ 915 private void associateTransaction(MessageAck ack) throws JMSException { 916 Transaction transaction; 917 if( ack.isPartOfTransaction() ) { 918 if (ack.isXaTransacted()) { 919 try { 920 transaction = transactionManager.getXATransaction((ActiveMQXid) ack.getTransactionId()); 921 } 922 catch (XAException e) { 923 throw (JMSException) new JMSException(e.getMessage()).initCause(e); 924 } 925 } else { 926 transaction = transactionManager.getLocalTransaction((String) ack.getTransactionId()); 927 } 928 929 } else { 930 transaction = null; 931 } 932 TransactionManager.setContexTransaction(transaction); 933 } 934 935 private String sanitizeString(String in) { 936 String result = null; 937 if (in != null) { 938 result = in.replace(':', '_'); 939 result = result.replace('/', '_'); 940 result = result.replace('\\', '_'); 941 } 942 return result; 943 } 944 945 /** 946 * @return Returns the memoryManager. 947 */ 948 public MemoryBoundedObjectManager getMemoryManager() { 949 return memoryManager; 950 } 951 952 953 /** 954 * @return Returns the queueManager. 955 */ 956 public MemoryBoundedQueueManager getQueueManager() { 957 return queueManager; 958 } 959 960 961 public String getName() { 962 return getBrokerName(); 963 } 964 public String toString (){ 965 return "broker: " + getName(); 966 } 967 968 969 970 971 }