001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.transport; 020 import java.util.Iterator; 021 import java.util.Map; 022 import javax.jms.JMSException; 023 import javax.jms.Session; 024 import org.apache.commons.logging.Log; 025 import org.apache.commons.logging.LogFactory; 026 import org.activemq.ActiveMQConnection; 027 import org.activemq.ActiveMQConnectionFactory; 028 import org.activemq.ActiveMQPrefetchPolicy; 029 import org.activemq.advisories.ConnectionAdvisor; 030 import org.activemq.advisories.ConnectionAdvisoryEvent; 031 import org.activemq.advisories.ConnectionAdvisoryEventListener; 032 import org.activemq.broker.BrokerClient; 033 import org.activemq.broker.BrokerContainer; 034 import org.activemq.broker.ConsumerInfoListener; 035 import org.activemq.message.ActiveMQDestination; 036 import org.activemq.message.BrokerInfo; 037 import org.activemq.message.ConsumerInfo; 038 import org.activemq.message.Receipt; 039 import org.activemq.service.MessageContainerManager; 040 import org.activemq.service.Service; 041 import org.activemq.transport.composite.CompositeTransportChannel; 042 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 043 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 044 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 045 046 /** 047 * Represents a broker's connection with a single remote broker which bridges the two brokers to form a network. <p/> 048 * The NetworkChannel contains a JMS connection with the remote broker. <p/>New subscriptions on the local broker are 049 * multiplexed into the JMS connection so that messages published on the remote broker can be replayed onto the local 050 * broker. 051 * 052 * @version $Revision: 1.1.1.1 $ 053 */ 054 public class NetworkChannel 055 implements 056 Service, 057 ConsumerInfoListener, 058 ConnectionAdvisoryEventListener, 059 TransportStatusEventListener { 060 private static final Log log = LogFactory.getLog(NetworkChannel.class); 061 protected String uri; 062 protected BrokerContainer brokerContainer; 063 protected ActiveMQConnection localConnection; 064 protected ActiveMQConnection remoteConnection; 065 protected ConcurrentHashMap topicConsumerMap; 066 protected ConcurrentHashMap queueConsumerMap; 067 protected String remoteUserName; 068 protected String remotePassword; 069 protected String remoteBrokerName; 070 protected String remoteClusterName; 071 protected int maximumRetries = 0; 072 protected long reconnectSleepTime = 2000L; 073 protected PooledExecutor threadPool; 074 private boolean remote = false; 075 private SynchronizedBoolean started = new SynchronizedBoolean(false); 076 private SynchronizedBoolean connected = new SynchronizedBoolean(false); 077 private SynchronizedBoolean stopped = new SynchronizedBoolean(false); 078 private ConnectionAdvisor connectionAdvisor; 079 private ActiveMQPrefetchPolicy localPrefetchPolicy; 080 private ActiveMQPrefetchPolicy remotePrefetchPolicy; 081 082 /** 083 * Default constructor 084 */ 085 public NetworkChannel() { 086 this.topicConsumerMap = new ConcurrentHashMap(); 087 this.queueConsumerMap = new ConcurrentHashMap(); 088 } 089 090 /** 091 * Default Constructor 092 * 093 * @param tp 094 */ 095 public NetworkChannel(PooledExecutor tp) { 096 this(); 097 this.threadPool = tp; 098 } 099 100 /** 101 * Constructor 102 * 103 * @param connector 104 * @param brokerContainer 105 * @param uri 106 */ 107 public NetworkChannel(NetworkConnector connector, BrokerContainer brokerContainer, String uri) { 108 this(connector.threadPool); 109 this.brokerContainer = brokerContainer; 110 this.uri = uri; 111 } 112 113 /** 114 * Create a NetworkConnector from a TransportChannel 115 * 116 * @param connector 117 * @param brokerContainer 118 * @param channel 119 * @param remoteBrokerName 120 * @param remoteclusterName 121 * @throws JMSException 122 */ 123 public NetworkChannel(NetworkConnector connector, BrokerContainer brokerContainer, TransportChannel channel, 124 String remoteBrokerName, String remoteclusterName) throws JMSException { 125 this(connector.threadPool); 126 this.brokerContainer = brokerContainer; 127 this.uri = ""; 128 this.remoteBrokerName = remoteBrokerName; 129 this.remoteClusterName = remoteclusterName; 130 ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(); 131 fac.setJ2EEcompliant(false); 132 fac.setTurboBoost(true); 133 remoteConnection = new ActiveMQConnection(fac, remoteUserName, remotePassword, channel); 134 remoteConnection.setClientID("Boondocks:" + remoteClusterName + ":" + remoteBrokerName); 135 remoteConnection.setQuickClose(true); 136 remoteConnection.start(); 137 BrokerInfo info = new BrokerInfo(); 138 info.setBrokerName(brokerContainer.getBroker().getBrokerName()); 139 info.setClusterName(brokerContainer.getBroker().getBrokerClusterName()); 140 channel.asyncSend(info); 141 remote = true; 142 } 143 144 /** 145 * @see org.activemq.transport.TransportStatusEventListener#statusChanged(org.activemq.transport.TransportStatusEvent) 146 */ 147 public void statusChanged(TransportStatusEvent event) { 148 if (event != null 149 && (event.getChannelStatus() == TransportStatusEvent.CONNECTED 150 || event.getChannelStatus() == TransportStatusEvent.RECONNECTED)) { 151 connected.set(true); 152 }else { 153 connected.set(false); 154 } 155 } 156 157 private void doSetConnected() { 158 synchronized (connected) { 159 connected.set(true); 160 connected.notifyAll(); 161 } 162 } 163 164 /** 165 * @return text info on this 166 */ 167 public String toString() { 168 return "NetworkChannel{ " + ", uri = '" + uri + "' " + ", remoteBrokerName = '" + remoteBrokerName + "' " 169 + " }"; 170 } 171 172 /** 173 * Start the channel 174 */ 175 public void start() { 176 if (started.commit(false, true)) { 177 try { 178 stopped.set(false); 179 threadPool.execute(new Runnable() { 180 public void run() { 181 String originalName = Thread.currentThread().getName(); 182 try { 183 Thread.currentThread().setName("NetworkChannel Initiator to " + uri); 184 initialize(); 185 startSubscriptions(); 186 log.info("Started NetworkChannel to " + uri); 187 } 188 catch (JMSException jmsEx) { 189 log.error("Failed to start NetworkChannel: " + uri, jmsEx); 190 } 191 finally { 192 Thread.currentThread().setName(originalName); 193 } 194 } 195 }); 196 } 197 catch (InterruptedException e) { 198 log.warn("Failed to start - interuppted", e); 199 } 200 } 201 } 202 203 /** 204 * stop the channel 205 * 206 * @throws JMSException on error 207 */ 208 public void stop() throws JMSException { 209 if (started.commit(true, false)) { 210 stopped.set(true); 211 topicConsumerMap.clear(); 212 if (remoteConnection != null) { 213 remoteConnection.close(); 214 remoteConnection = null; 215 } 216 if (localConnection != null) { 217 localConnection.close(); 218 localConnection = null; 219 } 220 for (Iterator i = topicConsumerMap.values().iterator();i.hasNext();) { 221 NetworkMessageBridge consumer = (NetworkMessageBridge) i.next(); 222 consumer.stop(); 223 } 224 } 225 } 226 227 /** 228 * Listen for new Consumer events at this broker 229 * 230 * @param client 231 * @param info 232 */ 233 public void onConsumerInfo(final BrokerClient client, final ConsumerInfo info) { 234 String brokerName = client.getBrokerConnector().getBrokerInfo().getBrokerName(); 235 if (!client.isClusteredConnection()) { 236 if (connected.get()) { 237 if (!info.hasVisited(remoteBrokerName)) { 238 if (info.isStarted()) { 239 addConsumerInfo(info); 240 } 241 else { 242 removeConsumerInfo(info); 243 } 244 } 245 } 246 else { 247 try { 248 threadPool.execute(new Runnable() { 249 public void run() { 250 if (!client.isClusteredConnection()) { 251 if (!info.hasVisited(remoteBrokerName)) { 252 synchronized (connected) { 253 while (!connected.get() && !stopped.get()) { 254 try { 255 connected.wait(500); 256 } 257 catch (InterruptedException e) { 258 log.debug("interuppted", e); 259 } 260 } 261 if (info.isStarted()) { 262 addConsumerInfo(info); 263 } 264 else { 265 removeConsumerInfo(info); 266 } 267 } 268 } 269 } 270 } 271 }); 272 } 273 catch (InterruptedException e) { 274 log.warn("Failed to process ConsumerInfo: " + info, e); 275 } 276 } 277 } 278 } 279 280 /** 281 * @return the uri of the broker(s) this channel is connected to 282 */ 283 public String getUri() { 284 return uri; 285 } 286 287 /** 288 * set the uri of the broker(s) this channel is connected to 289 * 290 * @param uri 291 */ 292 public void setUri(String uri) { 293 this.uri = uri; 294 } 295 296 /** 297 * @return Returns the remotePassword. 298 */ 299 public String getRemotePassword() { 300 return remotePassword; 301 } 302 303 /** 304 * @param remotePassword The remotePassword to set. 305 */ 306 public void setRemotePassword(String remotePassword) { 307 this.remotePassword = remotePassword; 308 } 309 310 /** 311 * @return Returns the remoteUserName. 312 */ 313 public String getRemoteUserName() { 314 return remoteUserName; 315 } 316 317 /** 318 * @param remoteUserName The remoteUserName to set. 319 */ 320 public void setRemoteUserName(String remoteUserName) { 321 this.remoteUserName = remoteUserName; 322 } 323 324 /** 325 * @return Returns the brokerContainer. 326 */ 327 public BrokerContainer getBrokerContainer() { 328 return brokerContainer; 329 } 330 331 /** 332 * @param brokerContainer The brokerContainer to set. 333 */ 334 public void setBrokerContainer(BrokerContainer brokerContainer) { 335 this.brokerContainer = brokerContainer; 336 } 337 338 public int getMaximumRetries() { 339 return maximumRetries; 340 } 341 342 public void setMaximumRetries(int maximumRetries) { 343 this.maximumRetries = maximumRetries; 344 } 345 346 public long getReconnectSleepTime() { 347 return reconnectSleepTime; 348 } 349 350 public void setReconnectSleepTime(long reconnectSleepTime) { 351 this.reconnectSleepTime = reconnectSleepTime; 352 } 353 354 public String getRemoteBrokerName() { 355 return remoteBrokerName; 356 } 357 358 public void setRemoteBrokerName(String remoteBrokerName) { 359 this.remoteBrokerName = remoteBrokerName; 360 } 361 362 /** 363 * @return Returns the threadPool. 364 */ 365 protected PooledExecutor getThreadPool() { 366 return threadPool; 367 } 368 369 /** 370 * @param threadPool The threadPool to set. 371 */ 372 protected void setThreadPool(PooledExecutor threadPool) { 373 this.threadPool = threadPool; 374 } 375 376 private synchronized ActiveMQConnection getLocalConnection() throws JMSException { 377 if (localConnection == null) { 378 initializeLocal(); 379 } 380 return localConnection; 381 } 382 383 private synchronized ActiveMQConnection getRemoteConnection() throws JMSException { 384 if (remoteConnection == null) { 385 initializeRemote(); 386 } 387 return remoteConnection; 388 } 389 390 /** 391 * @return Returns the localPrefetchPolicy. 392 */ 393 public ActiveMQPrefetchPolicy getLocalPrefetchPolicy() { 394 return localPrefetchPolicy; 395 } 396 397 /** 398 * @param localPrefetchPolicy The localPrefetchPolicy to set. 399 */ 400 public void setLocalPrefetchPolicy(ActiveMQPrefetchPolicy localPrefetchPolicy) { 401 this.localPrefetchPolicy = localPrefetchPolicy; 402 } 403 404 /** 405 * @return Returns the remotePrefetchPolicy. 406 */ 407 public ActiveMQPrefetchPolicy getRemotePrefetchPolicy() { 408 return remotePrefetchPolicy; 409 } 410 411 /** 412 * @param remotePrefetchPolicy The remotePrefetchPolicy to set. 413 */ 414 public void setRemotePrefetchPolicy(ActiveMQPrefetchPolicy remotePrefetchPolicy) { 415 this.remotePrefetchPolicy = remotePrefetchPolicy; 416 } 417 418 // Implementation methods 419 //------------------------------------------------------------------------- 420 /** 421 * Implementation of ConnectionAdvisoryEventListener 422 * 423 * @param event 424 */ 425 public void onEvent(ConnectionAdvisoryEvent event) { 426 String localBrokerName = brokerContainer.getBroker().getBrokerName(); 427 if (!event.getInfo().isClosed()) { 428 brokerContainer.registerRemoteClientID(event.getInfo().getClientId()); 429 } 430 else { 431 brokerContainer.deregisterRemoteClientID(event.getInfo().getClientId()); 432 } 433 } 434 435 private void addConsumerInfo(ConsumerInfo info) { 436 addConsumerInfo(info.getDestination(), info.getDestination().isTopic(), info.isDurableTopic()); 437 } 438 439 private void addConsumerInfo(ActiveMQDestination destination, boolean topic, boolean durableTopic) { 440 Map map = topic ? topicConsumerMap : queueConsumerMap; 441 NetworkMessageBridge bridge = (NetworkMessageBridge) map.get(destination.getPhysicalName()); 442 if (bridge == null) { 443 bridge = createBridge(map, destination, durableTopic); 444 } 445 else if (durableTopic && !bridge.isDurableTopic()) { 446 //upgrade our subscription 447 bridge.decrementReferenceCount(); 448 upgradeBridge(bridge); 449 } 450 bridge.incrementReferenceCount(); 451 } 452 453 private void upgradeBridge(NetworkMessageBridge bridge) { 454 try { 455 remoteConnection.stop(); 456 bridge.upgrade(); 457 } 458 catch (JMSException e) { 459 log.warn("Could not upgrade the NetworkMessageBridge to a durable subscription for destination: " 460 + bridge.getDestination(), e); 461 } 462 try { 463 remoteConnection.start(); 464 } 465 catch (JMSException e) { 466 log.error("Failed to restart the NetworkMessageBridge", e); 467 } 468 } 469 470 private NetworkMessageBridge createBridge(Map map, ActiveMQDestination destination, boolean durableTopic) { 471 NetworkMessageBridge bridge = new NetworkMessageBridge(); 472 try { 473 bridge.setDestination(destination); 474 bridge.setDurableTopic(durableTopic); 475 bridge.setLocalBrokerName(brokerContainer.getBroker().getBrokerName()); 476 bridge.setLocalSession(getLocalConnection().createSession(false, Session.CLIENT_ACKNOWLEDGE)); 477 bridge.setRemoteSession(getRemoteConnection().createSession(false, Session.CLIENT_ACKNOWLEDGE)); 478 map.put(destination.getPhysicalName(), bridge); 479 bridge.start(); 480 log.info("started NetworkMessageBridge for destination: " + destination + " -- NetworkChannel: " 481 + this.toString()); 482 } 483 catch (JMSException jmsEx) { 484 log.error("Failed to start NetworkMessageBridge for destination: " + destination, jmsEx); 485 } 486 return bridge; 487 } 488 489 private void removeConsumerInfo(final ConsumerInfo info) { 490 final String physicalName = info.getDestination().getPhysicalName(); 491 final NetworkMessageBridge bridge = (NetworkMessageBridge) topicConsumerMap.get(physicalName); 492 if (bridge != null) { 493 if (bridge.decrementReferenceCount() <= 0) { 494 try { 495 threadPool.execute(new Runnable() { 496 public void run() { 497 bridge.stop(); 498 topicConsumerMap.remove(physicalName); 499 log.info("stopped MetworkMessageBridge for destination: " + info.getDestination()); 500 } 501 }); 502 } 503 catch (InterruptedException e) { 504 log.warn("got interrupted stoping NetworkBridge", e); 505 } 506 } 507 } 508 } 509 510 private void startSubscriptions() { 511 if (!remote) { 512 MessageContainerManager mcm = brokerContainer.getBroker().getPersistentTopicContainerManager(); 513 if (mcm != null) { 514 Map map = mcm.getLocalDestinations(); 515 startSubscriptions(map, true, true); 516 } 517 mcm = brokerContainer.getBroker().getTransientTopicContainerManager(); 518 if (mcm != null) { 519 Map map = mcm.getLocalDestinations(); 520 startSubscriptions(map, true, false); 521 } 522 mcm = brokerContainer.getBroker().getTransientQueueContainerManager(); 523 if (mcm != null) { 524 Map map = mcm.getLocalDestinations(); 525 startSubscriptions(map, false, false); 526 } 527 mcm = brokerContainer.getBroker().getPersistentQueueContainerManager(); 528 if (mcm != null) { 529 Map map = mcm.getLocalDestinations(); 530 startSubscriptions(map, false, false); 531 } 532 } 533 } 534 535 private void startSubscriptions(Map destinations, boolean topic, boolean durableTopic) { 536 if (destinations != null) { 537 for (Iterator i = destinations.values().iterator();i.hasNext();) { 538 ActiveMQDestination dest = (ActiveMQDestination) i.next(); 539 addConsumerInfo(dest, topic, durableTopic); 540 } 541 } 542 } 543 544 protected void initialize() throws JMSException { 545 // force lazy construction 546 initializeLocal(); 547 initializeRemote(); 548 brokerContainer.getBroker().addConsumerInfoListener(NetworkChannel.this); 549 } 550 551 private synchronized void initializeRemote() throws JMSException { 552 if (remoteConnection == null) { 553 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(remoteUserName, remotePassword, uri); 554 //factory.setTurboBoost(true); 555 factory.setJ2EEcompliant(false); 556 factory.setQuickClose(true); 557 factory.setInternalConnection(true); 558 remoteConnection = (ActiveMQConnection) factory.createConnection(); 559 TransportChannel transportChannel = remoteConnection.getTransportChannel(); 560 if (transportChannel instanceof CompositeTransportChannel) { 561 CompositeTransportChannel composite = (CompositeTransportChannel) transportChannel; 562 composite.setMaximumRetries(maximumRetries); 563 composite.setFailureSleepTime(reconnectSleepTime); 564 composite.setIncrementTimeout(false); 565 } 566 transportChannel.addTransportStatusEventListener(this); 567 remoteConnection.setClientID(brokerContainer.getBroker().getBrokerName() + "_NetworkChannel"); 568 remoteConnection.start(); 569 BrokerInfo info = new BrokerInfo(); 570 info.setBrokerName(brokerContainer.getBroker().getBrokerName()); 571 info.setClusterName(brokerContainer.getBroker().getBrokerClusterName()); 572 Receipt receipt = remoteConnection.syncSendRequest(info); 573 if (receipt != null) { 574 remoteBrokerName = receipt.getBrokerName(); 575 remoteClusterName = receipt.getClusterName(); 576 } 577 connectionAdvisor = new ConnectionAdvisor(remoteConnection); 578 connectionAdvisor.addListener(this); 579 connectionAdvisor.start(); 580 if (remotePrefetchPolicy != null) { 581 remoteConnection.setPrefetchPolicy(remotePrefetchPolicy); 582 } 583 } 584 doSetConnected(); 585 } 586 587 private void initializeLocal() throws JMSException { 588 String brokerName = brokerContainer.getBroker().getBrokerName(); 589 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://" + brokerName); 590 factory.setTurboBoost(true); 591 factory.setJ2EEcompliant(false); 592 factory.setBrokerName(brokerName); 593 factory.setQuickClose(true); 594 factory.setInternalConnection(true); 595 localConnection = (ActiveMQConnection) factory.createConnection(); 596 localConnection.start(); 597 BrokerInfo info = new BrokerInfo(); 598 info.setBrokerName(remoteBrokerName); 599 info.setClusterName(remoteClusterName); 600 localConnection.asyncSendPacket(info); 601 if (localPrefetchPolicy != null) { 602 localConnection.setPrefetchPolicy(localPrefetchPolicy); 603 } 604 } 605 606 /*private synchronized void releaseRemote() throws JMSException { 607 if (remoteConnection != null) { 608 TransportChannel transportChannel = remoteConnection.getTransportChannel(); 609 transportChannel.stop(); 610 if (connectionAdvisor != null) { 611 connectionAdvisor.stop(); 612 } 613 try { 614 remoteConnection.stop(); 615 } catch (JMSException e) { 616 // ignore this exception, since the remote broker is most likely down 617 } 618 remoteConnection = null; 619 } 620 }*/ 621 622 }