001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.broker.impl; 020 import java.io.IOException; 021 import java.util.ArrayList; 022 import java.util.HashSet; 023 import java.util.Iterator; 024 import java.util.Set; 025 026 import javax.jms.ExceptionListener; 027 import javax.jms.JMSException; 028 import javax.security.auth.Subject; 029 import javax.transaction.xa.XAException; 030 031 import org.activemq.broker.BrokerAdmin; 032 import org.activemq.broker.BrokerClient; 033 import org.activemq.broker.BrokerConnector; 034 import org.activemq.io.util.SpooledBoundedActiveMQMessageQueue; 035 import org.activemq.message.ActiveMQMessage; 036 import org.activemq.message.ActiveMQXid; 037 import org.activemq.message.BrokerAdminCommand; 038 import org.activemq.message.BrokerInfo; 039 import org.activemq.message.CapacityInfo; 040 import org.activemq.message.CleanupConnectionInfo; 041 import org.activemq.message.ConnectionInfo; 042 import org.activemq.message.ConsumerInfo; 043 import org.activemq.message.DurableUnsubscribe; 044 import org.activemq.message.IntResponseReceipt; 045 import org.activemq.message.KeepAlive; 046 import org.activemq.message.MessageAck; 047 import org.activemq.message.Packet; 048 import org.activemq.message.PacketListener; 049 import org.activemq.message.ProducerInfo; 050 import org.activemq.message.Receipt; 051 import org.activemq.message.ResponseReceipt; 052 import org.activemq.message.SessionInfo; 053 import org.activemq.message.TransactionInfo; 054 import org.activemq.message.XATransactionInfo; 055 import org.activemq.transport.NetworkChannel; 056 import org.activemq.transport.NetworkConnector; 057 import org.activemq.transport.TransportChannel; 058 import org.activemq.util.IdGenerator; 059 import org.apache.commons.logging.Log; 060 import org.apache.commons.logging.LogFactory; 061 062 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 063 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 064 import EDU.oswego.cs.dl.util.concurrent.ThreadedExecutor; 065 066 /** 067 * A Broker client side proxy representing a JMS Connnection 068 * 069 * @version $Revision: 1.1.1.1 $ 070 */ 071 public class BrokerClientImpl implements BrokerClient, ExceptionListener, PacketListener { 072 073 private static final Log log = LogFactory.getLog(BrokerClientImpl.class); 074 private static final Log commandLog = LogFactory.getLog("org.activemq.broker.CommandTrace"); 075 076 private BrokerConnector brokerConnector; 077 private TransportChannel channel; 078 private ConnectionInfo connectionInfo; 079 private IdGenerator packetIdGenerator; 080 private SynchronizedBoolean closed; 081 private Set activeConsumers; 082 private CopyOnWriteArrayList consumers; 083 private CopyOnWriteArrayList producers; 084 private CopyOnWriteArrayList transactions; 085 private CopyOnWriteArrayList sessions; 086 private SynchronizedBoolean started; 087 private boolean brokerConnection; 088 private boolean clusteredConnection; 089 private String remoteBrokerName; 090 private int capacity = 100; 091 private SpooledBoundedActiveMQMessageQueue spoolQueue; 092 private boolean cleanedUp; 093 private boolean registered; 094 private ArrayList dispatchQueue = new ArrayList(); 095 private Subject subject; 096 private boolean remoteNetworkConnector; 097 098 /** 099 * Default Constructor of BrokerClientImpl 100 */ 101 public BrokerClientImpl() { 102 this.packetIdGenerator = new IdGenerator(); 103 this.closed = new SynchronizedBoolean(false); 104 this.started = new SynchronizedBoolean(false); 105 this.activeConsumers = new HashSet(); 106 this.consumers = new CopyOnWriteArrayList(); 107 this.producers = new CopyOnWriteArrayList(); 108 this.transactions = new CopyOnWriteArrayList(); 109 this.sessions = new CopyOnWriteArrayList(); 110 } 111 112 /** 113 * Initialize the BrokerClient 114 * 115 * @param brokerConnector 116 * @param channel 117 */ 118 public void initialize(BrokerConnector brokerConnector, TransportChannel channel) { 119 this.brokerConnector = brokerConnector; 120 this.channel = channel; 121 this.channel.setPacketListener(this); 122 this.channel.setExceptionListener(this); 123 log.trace("brokerConnectorConnector client initialized"); 124 } 125 126 /** 127 * @return the BrokerConnector this client is associated with 128 */ 129 public BrokerConnector getBrokerConnector() { 130 return this.brokerConnector; 131 } 132 133 /** 134 * @return the connection information for this client 135 */ 136 public ConnectionInfo getConnectionInfo() { 137 return connectionInfo; 138 } 139 140 /** 141 * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) 142 */ 143 public void onException(JMSException jmsEx) { 144 log.info("Client disconnected: " + this); 145 log.debug("Disconnect cuase: ", jmsEx); 146 close(); 147 } 148 149 /** 150 * @return pretty print for this brokerConnector-client 151 */ 152 public String toString() { 153 String str = "brokerConnector-client:(" + hashCode() + ") "; 154 str += connectionInfo == null ? "" : connectionInfo.getClientId(); 155 str += ": " + channel; 156 return str; 157 } 158 159 /** 160 * Dispatch an ActiveMQMessage to the end client 161 * 162 * @param message 163 */ 164 public void dispatch(ActiveMQMessage message) { 165 if (!isSlowConsumer()) { 166 dispatchToClient(message); 167 } 168 else { 169 if (spoolQueue == null) { 170 log.warn("Connection: " + connectionInfo.getClientId() + " is a slow consumer"); 171 String spoolName = brokerConnector.getBrokerInfo().getBrokerName() + "_" + connectionInfo.getClientId(); 172 try { 173 spoolQueue = new SpooledBoundedActiveMQMessageQueue(brokerConnector.getBrokerContainer().getBroker() 174 .getTempDir(), spoolName); 175 final SpooledBoundedActiveMQMessageQueue bpq = spoolQueue; 176 ThreadedExecutor exec = new ThreadedExecutor(); 177 exec.execute(new Runnable() { 178 public void run() { 179 while (!closed.get()) { 180 try { 181 Packet packet = bpq.dequeue(); 182 if (packet != null) { 183 dispatchToClient(packet); 184 } 185 } 186 catch (InterruptedException e) { 187 log.warn("async dispatch got an interupt", e); 188 } 189 catch (JMSException e) { 190 log.error("async dispatch got an problem", e); 191 } 192 } 193 } 194 }); 195 } 196 catch (IOException e) { 197 log.error("Could not create SpooledBoundedQueue for this slow consumer", e); 198 close(); 199 } 200 catch (InterruptedException e) { 201 log.error("Could not create SpooledBoundedQueue for this slow consumer", e); 202 close(); 203 } 204 } 205 if (spoolQueue != null) { 206 try { 207 spoolQueue.enqueue(message); 208 } 209 catch (JMSException e) { 210 log.error( 211 "Could not enqueue message " + message + " to SpooledBoundedQueue for this slow consumer", 212 e); 213 close(); 214 } 215 } 216 } 217 } 218 219 private void dispatchToClient(Packet message) { 220 if (started.get()) { 221 send(message); 222 223 } 224 else { 225 boolean msgSent = false; 226 if (message.isJMSMessage()) { 227 ActiveMQMessage jmsMsg = (ActiveMQMessage) message; 228 if (jmsMsg.getJMSActiveMQDestination().isAdvisory()) { 229 send(message); 230 msgSent = true; 231 } 232 } 233 if (!msgSent) { 234 // If the connection is stopped.. we have to hold the message till it is started. 235 synchronized (started) { 236 dispatchQueue.add(message); 237 } 238 } 239 } 240 } 241 242 /** 243 * @return true if the peer for this Client is itself another Broker 244 */ 245 public boolean isBrokerConnection() { 246 return brokerConnection; 247 } 248 249 /** 250 * @return true id this client is part of a cluster 251 */ 252 public boolean isClusteredConnection() { 253 return clusteredConnection; 254 } 255 256 /** 257 * Get the Capacity for in-progress messages at the peer (probably a JMSConnection) Legimate values between 0-100. 0 258 * capacity representing that the peer cannot process any more messages at the current time 259 * 260 * @return 261 */ 262 public int getCapacity() { 263 return capacity; 264 } 265 266 /** 267 * @return the client id of the remote connection 268 */ 269 public String getClientID() { 270 if (connectionInfo != null) { 271 return connectionInfo.getClientId(); 272 } 273 return null; 274 } 275 276 /** 277 * @return the channel used 278 */ 279 public TransportChannel getChannel() { 280 return channel; 281 } 282 283 /** 284 * Get an indication if the peer should be considered as a slow consumer 285 * 286 * @return true id the peer should be considered as a slow consumer 287 */ 288 public boolean isSlowConsumer() { 289 return capacity <= 20; //don't want to fill the peer completely - as this may effect it's processing! 290 } 291 292 /** 293 * Consume a Packet from the underlying TransportChannel for processing 294 * 295 * @param packet 296 */ 297 public void consume(Packet packet) { 298 if (packet != null) { 299 300 if( commandLog.isDebugEnabled() ) 301 commandLog.debug("broker for "+getClientID()+" received: "+packet); 302 303 Throwable requestEx = null; 304 boolean failed = false; 305 boolean receiptRequired = packet.isReceiptRequired(); 306 short correlationId = packet.getId(); 307 String brokerName = brokerConnector.getBrokerInfo().getBrokerName(); 308 String clusterName = brokerConnector.getBrokerInfo().getClusterName(); 309 try { 310 if (brokerConnection) { 311 if (remoteBrokerName != null && remoteBrokerName.length() > 0) { 312 packet.addBrokerVisited(remoteBrokerName); //got from the remote broker 313 } 314 packet.addBrokerVisited(brokerName); 315 } 316 if (packet.isJMSMessage()) { 317 ActiveMQMessage message = (ActiveMQMessage) packet; 318 319 if (!brokerConnection) { 320 message.setEntryBrokerName(brokerName); 321 message.setEntryClusterName(clusterName); 322 } 323 consumeActiveMQMessage(message); 324 } 325 else { 326 switch (packet.getPacketType()) { 327 case Packet.ACTIVEMQ_MSG_ACK : { 328 MessageAck ack = (MessageAck) packet; 329 consumeMessageAck(ack); 330 break; 331 } 332 case Packet.XA_TRANSACTION_INFO : { 333 XATransactionInfo info = (XATransactionInfo) packet; 334 consumeXATransactionInfo(info); 335 receiptRequired=info.isReceiptRequired(); 336 break; 337 } 338 case Packet.TRANSACTION_INFO : { 339 TransactionInfo info = (TransactionInfo) packet; 340 consumeTransactionInfo(info); 341 break; 342 } 343 case Packet.CONSUMER_INFO : { 344 ConsumerInfo info = (ConsumerInfo) packet; 345 consumeConsumerInfo(info); 346 break; 347 } 348 case Packet.PRODUCER_INFO : { 349 ProducerInfo info = (ProducerInfo) packet; 350 consumeProducerInfo(info); 351 break; 352 } 353 case Packet.SESSION_INFO : { 354 SessionInfo info = (SessionInfo) packet; 355 consumeSessionInfo(info); 356 break; 357 } 358 case Packet.ACTIVEMQ_CONNECTION_INFO : { 359 ConnectionInfo info = (ConnectionInfo) packet; 360 consumeConnectionInfo(info); 361 break; 362 } 363 case Packet.DURABLE_UNSUBSCRIBE : { 364 DurableUnsubscribe ds = (DurableUnsubscribe) packet; 365 brokerConnector.durableUnsubscribe(this, ds); 366 break; 367 } 368 case Packet.CAPACITY_INFO : { 369 CapacityInfo info = (CapacityInfo) packet; 370 consumeCapacityInfo(info); 371 break; 372 } 373 case Packet.CAPACITY_INFO_REQUEST : { 374 updateCapacityInfo(packet.getId()); 375 break; 376 } 377 case Packet.ACTIVEMQ_BROKER_INFO : { 378 consumeBrokerInfo((BrokerInfo) packet); 379 break; 380 } 381 case Packet.KEEP_ALIVE : { 382 // Ignore as the packet contains no additional information to consume 383 break; 384 } 385 case Packet.BROKER_ADMIN_COMMAND : { 386 consumeBrokerAdminCommand((BrokerAdminCommand) packet); 387 break; 388 } 389 case Packet.CLEANUP_CONNECTION_INFO : { 390 consumeCleanupConnectionInfo((CleanupConnectionInfo) packet); 391 break; 392 } 393 default : { 394 log.warn("Unknown Packet received: " + packet); 395 break; 396 } 397 } 398 } 399 } 400 catch (Throwable e) { 401 requestEx = e; 402 log.warn("caught exception consuming packet: " + packet, e); 403 failed = true; 404 } 405 if (receiptRequired){ 406 sendReceipt(correlationId, requestEx, failed); 407 } 408 } 409 } 410 411 /** 412 * @param cleanupInfo 413 * @throws JMSException 414 */ 415 private void consumeCleanupConnectionInfo(CleanupConnectionInfo cleanupInfo) throws JMSException { 416 try { 417 418 for (Iterator i = consumers.iterator(); i.hasNext();) { 419 ConsumerInfo info = (ConsumerInfo) i.next(); 420 info.setStarted(false); 421 this.brokerConnector.deregisterMessageConsumer(this, info); 422 } 423 for (Iterator i = producers.iterator(); i.hasNext();) { 424 ProducerInfo info = (ProducerInfo) i.next(); 425 info.setStarted(false); 426 this.brokerConnector.deregisterMessageProducer(this, info); 427 } 428 for (Iterator i = sessions.iterator(); i.hasNext();) { 429 SessionInfo info = (SessionInfo) i.next(); 430 info.setStarted(false); 431 this.brokerConnector.deregisterSession(this, info); 432 } 433 for (Iterator i = transactions.iterator(); i.hasNext();) { 434 this.brokerConnector.rollbackTransaction(this, i.next().toString()); 435 } 436 this.brokerConnector.deregisterClient(this, connectionInfo); 437 registered = false; 438 439 } finally { 440 // whatever happens, lets make sure we unregister & clean things 441 // down 442 if (log.isDebugEnabled()) { 443 log.debug(this + " has stopped"); 444 } 445 this.consumers.clear(); 446 this.producers.clear(); 447 this.transactions.clear(); 448 this.sessions.clear(); 449 } 450 451 } 452 453 /** 454 * @param command 455 * @throws JMSException 456 */ 457 private void consumeBrokerAdminCommand(BrokerAdminCommand command) throws JMSException { 458 BrokerAdmin brokerAdmin = brokerConnector.getBrokerContainer().getBroker().getBrokerAdmin(); 459 if (BrokerAdminCommand.CREATE_DESTINATION.equals(command.getCommand())) { 460 brokerAdmin.createMessageContainer(command.getDestination()); 461 } 462 else if (BrokerAdminCommand.DESTROY_DESTINATION.equals(command.getCommand())) { 463 brokerAdmin.destoryMessageContainer(command.getDestination()); 464 } 465 else if (BrokerAdminCommand.EMPTY_DESTINATION.equals(command.getCommand())) { 466 brokerAdmin.getMessageContainerAdmin(command.getDestination()).empty(); 467 } 468 else if (BrokerAdminCommand.SHUTDOWN_SERVER_VM.equals(command.getCommand())) { 469 if (Boolean.getBoolean("enable.vm.shutdown")) { 470 log.info("processing command=" + BrokerAdminCommand.SHUTDOWN_SERVER_VM); 471 System.exit(1); 472 } else 473 { 474 log.warn("ignoring command=" + BrokerAdminCommand.SHUTDOWN_SERVER_VM + ", enable.vm.shutdown=false"); 475 } 476 } 477 else { 478 throw new JMSException("Broker Admin Command type: " + command.getCommand() + " not recognized."); 479 } 480 } 481 482 /** 483 * Register/deregister MessageConsumer with the Broker 484 * 485 * @param info 486 * @throws JMSException 487 */ 488 public void consumeConsumerInfo(ConsumerInfo info) throws JMSException { 489 String localBrokerName = brokerConnector.getBrokerInfo().getBrokerName(); 490 if (info.isStarted()) { 491 consumers.add(info); 492 if (this.activeConsumers.add(info)) { 493 this.brokerConnector.registerMessageConsumer(this, info); 494 } 495 } 496 else { 497 consumers.remove(info); 498 if (activeConsumers.remove(info)) { 499 this.brokerConnector.deregisterMessageConsumer(this, info); 500 } 501 } 502 } 503 504 /** 505 * Update the peer Connection about the Broker's capacity for messages 506 * 507 * @param capacity 508 */ 509 public void updateBrokerCapacity(int capacity) { 510 CapacityInfo info = new CapacityInfo(); 511 info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName()); 512 info.setCapacity(capacity); 513 info.setFlowControlTimeout(getFlowControlTimeout(capacity)); 514 send(info); 515 } 516 517 /** 518 * register with the Broker 519 * 520 * @param info 521 * @throws JMSException 522 */ 523 public void consumeConnectionInfo(ConnectionInfo info) throws JMSException { 524 this.connectionInfo = info; 525 if (info.isClosed()) { 526 try { 527 cleanUp(); 528 if (info.isReceiptRequired()){ 529 sendReceipt(info.getId(), null, false); 530 } 531 info.setReceiptRequired(false); 532 try { 533 Thread.sleep(500); 534 } 535 catch (Throwable e) { 536 } 537 } 538 finally { 539 close(); 540 } 541 } 542 else { 543 if (!registered) { 544 this.brokerConnector.registerClient(this, info); 545 registered = true; 546 } 547 synchronized (started) { 548 //set transport hint 549 if (info.getProperties() != null && info.getProperties().getProperty(ConnectionInfo.NO_DELAY_PROPERTY) != null){ 550 boolean noDelay = new Boolean(info.getProperties().getProperty(ConnectionInfo.NO_DELAY_PROPERTY)).booleanValue(); 551 channel.setNoDelay(noDelay); 552 553 } 554 if (!started.get() && info.isStarted()) { 555 started.set(true); 556 // Dispatch any queued 557 log.debug(this + " has started running client version " + info.getClientVersion() 558 + " , wire format = " + info.getWireFormatVersion()); 559 //go through consumers, producers, and sessions - setting their clientId (which might not have been set) 560 for (Iterator i = consumers.iterator();i.hasNext();) { 561 ConsumerInfo ci = (ConsumerInfo) i.next(); 562 ci.setClientId(info.getClientId()); 563 } 564 for (Iterator i = producers.iterator();i.hasNext();) { 565 ProducerInfo pi = (ProducerInfo) i.next(); 566 pi.setClientId(info.getClientId()); 567 } 568 for (Iterator i = sessions.iterator();i.hasNext();) { 569 SessionInfo si = (SessionInfo) i.next(); 570 si.setClientId(info.getClientId()); 571 } 572 for (int i = 0;i < dispatchQueue.size();i++) { 573 ActiveMQMessage msg = (ActiveMQMessage) dispatchQueue.get(i); 574 dispatch(msg); 575 } 576 dispatchQueue.clear(); 577 } 578 if (started.get() && !info.isStarted()) { 579 started.set(false); 580 log.debug(this + " has stopped"); 581 } 582 } 583 } 584 } 585 586 /** 587 * start consuming messages 588 * 589 * @throws JMSException 590 */ 591 public void start() throws JMSException { 592 channel.start(); 593 } 594 595 /** 596 * stop consuming messages 597 * 598 * @throws JMSException 599 */ 600 public void stop() throws JMSException { 601 log.trace("Stopping channel: " + channel); 602 channel.stop(); 603 } 604 605 /** 606 * cleanup 607 */ 608 public synchronized void cleanUp() { 609 // we could be called here from 2 different code paths 610 // based on if we get a transport failure or we do a clean shutdown 611 // so lets only run this stuff once 612 if (!cleanedUp) { 613 cleanedUp = true; 614 try { 615 try { 616 for (Iterator i = consumers.iterator();i.hasNext();) { 617 ConsumerInfo info = (ConsumerInfo) i.next(); 618 info.setStarted(false); 619 this.brokerConnector.deregisterMessageConsumer(this, info); 620 } 621 for (Iterator i = producers.iterator();i.hasNext();) { 622 ProducerInfo info = (ProducerInfo) i.next(); 623 info.setStarted(false); 624 this.brokerConnector.deregisterMessageProducer(this, info); 625 } 626 for (Iterator i = sessions.iterator();i.hasNext();) { 627 SessionInfo info = (SessionInfo) i.next(); 628 info.setStarted(false); 629 this.brokerConnector.deregisterSession(this, info); 630 } 631 for (Iterator i = transactions.iterator();i.hasNext();) { 632 this.brokerConnector.rollbackTransaction(this, i.next().toString()); 633 } 634 } 635 finally { 636 // whatever happens, lets make sure we unregister & clean things down 637 if (log.isDebugEnabled()) { 638 log.debug(this + " has stopped"); 639 } 640 this.consumers.clear(); 641 this.producers.clear(); 642 this.transactions.clear(); 643 this.sessions.clear(); 644 this.brokerConnector.deregisterClient(this, connectionInfo); 645 registered = false; 646 } 647 } 648 catch (JMSException e) { 649 log.warn("failed to de-register Broker client: " + e, e); 650 } 651 } 652 else { 653 log.debug("We are ignoring a duplicate cleanup() method called for: " + this); 654 } 655 } 656 657 // Implementation methods 658 //------------------------------------------------------------------------- 659 protected void send(Packet packet) { 660 if (!closed.get()) { 661 try { 662 if (brokerConnection) { 663 String brokerName = brokerConnector.getBrokerContainer().getBroker().getBrokerName(); 664 packet.addBrokerVisited(brokerName); 665 if (packet.hasVisited(remoteBrokerName)) { 666 if (log.isDebugEnabled()) { 667 log.debug("Not Forwarding: " + remoteBrokerName + " has already been visited by packet: " 668 + packet); 669 } 670 return; 671 } 672 } 673 packet.setId(this.packetIdGenerator.getNextShortSequence()); 674 if( commandLog.isDebugEnabled() ) 675 commandLog.debug("broker for "+getClientID()+" sending: "+packet); 676 this.channel.asyncSend(packet); 677 } 678 catch (JMSException e) { 679 log.warn(this + " caught exception ", e); 680 close(); 681 } 682 } 683 } 684 685 /** 686 * validate the connection 687 * @param timeout 688 * @throws JMSException 689 */ 690 public void validateConnection(int timeout) throws JMSException { 691 KeepAlive packet = new KeepAlive(); 692 packet.setReceiptRequired(true); 693 packet.setId(this.packetIdGenerator.getNextShortSequence()); 694 // In most cases, if the transport is dead due to network errors 695 // the network error will be recognised immediately and an exception 696 // thrown. If the duplicate client ids are due to misconfiguration, 697 // we make sure that we do not terminate the "right" connection 698 // prematurely by using a long timeout here. If the existing client 699 // is working heavily and/or over a slow link, it might take some time 700 // for it to respond. In such a case, the new client is misconfigured 701 // and can wait for a while before being kicked out. 702 703 Receipt r = getChannel().send(packet, timeout); 704 if (r == null) throw new JMSException("Client did not respond in time"); 705 706 } 707 708 protected void close() { 709 if (closed.commit(false, true)) { 710 this.channel.stop(); 711 log.debug(this + " has closed"); 712 } 713 } 714 715 /** 716 * Send message to Broker 717 * 718 * @param message 719 * @throws JMSException 720 */ 721 private void consumeActiveMQMessage(ActiveMQMessage message) throws JMSException { 722 this.brokerConnector.sendMessage(this, message); 723 } 724 725 /** 726 * Send Message acknowledge to the Broker 727 * 728 * @param ack 729 * @throws JMSException 730 */ 731 private void consumeMessageAck(MessageAck ack) throws JMSException { 732 this.brokerConnector.acknowledgeMessage(this, ack); 733 } 734 735 /** 736 * Handle transaction start/commit/rollback 737 * 738 * @param info 739 * @throws JMSException 740 */ 741 private void consumeTransactionInfo(TransactionInfo info) throws JMSException { 742 if (info.getType() == TransactionInfo.START) { 743 transactions.add(info.getTransactionId()); 744 this.brokerConnector.startTransaction(this, info.getTransactionId()); 745 } 746 else { 747 if (info.getType() == TransactionInfo.ROLLBACK) { 748 this.brokerConnector.rollbackTransaction(this, info.getTransactionId()); 749 } 750 else if (info.getType() == TransactionInfo.COMMIT) { 751 this.brokerConnector.commitTransaction(this, info.getTransactionId()); 752 } 753 transactions.remove(info.getTransactionId()); 754 } 755 } 756 757 /** 758 * Handle XA transaction start/prepare/commit/rollback 759 * 760 * @param info 761 * @throws JMSException 762 * @throws XAException 763 */ 764 private void consumeXATransactionInfo(XATransactionInfo info) throws JMSException, XAException { 765 if (info.getType() == XATransactionInfo.START) { 766 this.brokerConnector.startTransaction(this, info.getXid()); 767 } 768 else if (info.getType() == XATransactionInfo.XA_RECOVER) { 769 ActiveMQXid rc[] = this.brokerConnector.getPreparedTransactions(this); 770 if( info.isReceiptRequired()) { 771 // We will be sending our own receipt.. 772 info.setReceiptRequired(false); 773 // Send the receipt.. 774 ResponseReceipt receipt = new ResponseReceipt(); 775 receipt.setCorrelationId(info.getId()); 776 receipt.setResult(rc); 777 send(receipt); 778 } 779 } 780 else if (info.getType() == XATransactionInfo.GET_RM_ID) { 781 String rc = this.brokerConnector.getResourceManagerId(this); 782 if( info.isReceiptRequired()) { 783 // We will be sending our own receipt.. 784 info.setReceiptRequired(false); 785 // Send the receipt.. 786 ResponseReceipt receipt = new ResponseReceipt(); 787 receipt.setId(this.packetIdGenerator.getNextShortSequence()); 788 receipt.setCorrelationId(info.getId()); 789 receipt.setResult(rc); 790 send(receipt); 791 } 792 } 793 else if (info.getType() == XATransactionInfo.END) { 794 // we don't do anything.. 795 } 796 else { 797 if (info.getType() == XATransactionInfo.PRE_COMMIT) { 798 int rc = this.brokerConnector.prepareTransaction(this, info.getXid()); 799 // We will be sending our own receipt.. 800 if( info.isReceiptRequired()) { 801 info.setReceiptRequired(false); 802 // Send the receipt.. 803 IntResponseReceipt receipt = new IntResponseReceipt(); 804 receipt.setId(this.packetIdGenerator.getNextShortSequence()); 805 receipt.setCorrelationId(info.getId()); 806 receipt.setResult(rc); 807 send(receipt); 808 } 809 } 810 else if (info.getType() == XATransactionInfo.ROLLBACK) { 811 this.brokerConnector.rollbackTransaction(this, info.getXid()); 812 } 813 else if (info.getType() == XATransactionInfo.COMMIT_ONE_PHASE) { 814 this.brokerConnector.commitTransaction(this, info.getXid(), true); 815 } 816 else if (info.getType() == XATransactionInfo.COMMIT) { 817 this.brokerConnector.commitTransaction(this, info.getXid(), false); 818 } 819 else { 820 throw new JMSException("Packet type: " + info.getType() + " not recognized."); 821 } 822 } 823 } 824 825 /** 826 * register/deregister MessageProducer in the Broker 827 * 828 * @param info 829 * @throws JMSException 830 */ 831 private void consumeProducerInfo(ProducerInfo info) throws JMSException { 832 if (info.isStarted()) { 833 producers.add(info); 834 this.brokerConnector.registerMessageProducer(this, info); 835 } 836 else { 837 producers.remove(info); 838 this.brokerConnector.deregisterMessageProducer(this, info); 839 } 840 } 841 842 /** 843 * register/deregister Session in a Broker 844 * 845 * @param info 846 * @throws JMSException 847 */ 848 private void consumeSessionInfo(SessionInfo info) throws JMSException { 849 if (info.isStarted()) { 850 sessions.add(info); 851 this.brokerConnector.registerSession(this, info); 852 } 853 else { 854 sessions.remove(info); 855 this.brokerConnector.deregisterSession(this, info); 856 } 857 } 858 859 /** 860 * Update capacity for the peer 861 * 862 * @param info 863 */ 864 private void consumeCapacityInfo(CapacityInfo info) { 865 this.capacity = info.getCapacity(); 866 } 867 868 private void updateCapacityInfo(short correlationId) { 869 CapacityInfo info = new CapacityInfo(); 870 info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName()); 871 info.setCorrelationId(correlationId); 872 info.setCapacity(this.brokerConnector.getBrokerCapacity()); 873 info.setFlowControlTimeout(getFlowControlTimeout(info.getCapacity())); 874 send(info); 875 } 876 877 private long getFlowControlTimeout(int capacity) { 878 long result = -1; 879 if (capacity <= 0) { 880 result = 10000; 881 } 882 else if (capacity <= 10) { 883 result = 1000; 884 } 885 else if (capacity <= 20) { 886 result = 10; 887 } 888 return result; 889 } 890 891 private void consumeBrokerInfo(final BrokerInfo info) { 892 brokerConnection = true; 893 started.set(true); 894 remoteBrokerName = info.getBrokerName(); 895 if (remoteBrokerName == null || remoteBrokerName.length() == 0) { 896 log.warn("No remote broker name available!"); 897 } 898 else { 899 if (log.isDebugEnabled()) { 900 log.debug("Received broker info from: " + remoteBrokerName + " on client: " + channel); 901 } 902 } 903 String clusterName = getBrokerConnector().getBrokerContainer().getBroker().getBrokerClusterName(); 904 if (clusterName.equals(info.getClusterName())) { 905 clusteredConnection = true; 906 } 907 if (!remoteNetworkConnector && info.isRemote()) { 908 try { 909 final NetworkConnector networkConnector = new NetworkConnector(brokerConnector.getBrokerContainer()); 910 networkConnector.getThreadPool().execute(new Runnable() { 911 public void run() { 912 try { 913 NetworkChannel networkChannel = new NetworkChannel(networkConnector, brokerConnector 914 .getBrokerContainer(), channel, info.getBrokerName(), info.getClusterName()); 915 networkConnector.addNetworkChannel(networkChannel); 916 brokerConnector.getBrokerContainer().addNetworkConnector(networkConnector); 917 networkConnector.start(); 918 } 919 catch (JMSException e) { 920 log.error("Failed to create reverse remote channel", e); 921 } 922 } 923 }); 924 log.info("Started reverse remote channel to " + remoteBrokerName); 925 remoteNetworkConnector = true; 926 } 927 catch (InterruptedException e) { 928 log.error("Failed to create reverse remote channel", e); 929 } 930 } 931 } 932 933 934 private void sendReceipt(short correlationId, Throwable requestEx, boolean failed) { 935 Receipt receipt = new Receipt(); 936 receipt.setCorrelationId(correlationId); 937 receipt.setBrokerName(brokerConnector.getBrokerInfo().getBrokerName()); 938 receipt.setClusterName(brokerConnector.getBrokerInfo().getClusterName()); 939 receipt.setException(requestEx); 940 receipt.setFailed(failed); 941 send(receipt); 942 } 943 944 /** 945 * @param subject 946 */ 947 public void setSubject(Subject subject) { 948 this.subject = subject; 949 } 950 951 /** 952 * @return the subject 953 */ 954 public Subject getSubject() { 955 return subject; 956 } 957 }