001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.broker.impl; 019 020 import org.activemq.broker.BrokerClient; 021 import org.activemq.broker.BrokerConnector; 022 import org.activemq.broker.BrokerContainer; 023 import org.activemq.io.WireFormat; 024 import org.activemq.message.ActiveMQMessage; 025 import org.activemq.message.ActiveMQXid; 026 import org.activemq.message.BrokerInfo; 027 import org.activemq.message.ConnectionInfo; 028 import org.activemq.message.ConsumerInfo; 029 import org.activemq.message.DurableUnsubscribe; 030 import org.activemq.message.MessageAck; 031 import org.activemq.message.ProducerInfo; 032 import org.activemq.message.SessionInfo; 033 import org.activemq.transport.TransportChannel; 034 import org.activemq.transport.TransportChannelListener; 035 import org.activemq.transport.TransportServerChannel; 036 import org.activemq.transport.TransportServerChannelProvider; 037 import org.apache.commons.logging.Log; 038 import org.apache.commons.logging.LogFactory; 039 040 import javax.jms.JMSException; 041 import javax.jms.JMSSecurityException; 042 import javax.transaction.xa.XAException; 043 import java.net.URI; 044 import java.net.URISyntaxException; 045 import java.util.Collections; 046 import java.util.HashMap; 047 import java.util.Map; 048 049 /** 050 * An implementation of the broker (the JMS server) 051 * 052 * @version $Revision: 1.1.1.1 $ 053 */ 054 public class BrokerConnectorImpl implements BrokerConnector, TransportChannelListener { 055 private BrokerInfo brokerInfo; 056 057 private TransportServerChannel serverChannel; 058 private Log log; 059 private BrokerContainer container; 060 private Map clients = Collections.synchronizedMap(new HashMap()); 061 062 /** 063 * Helper constructor for TCP protocol with the given bind address 064 * 065 * @param container 066 * @param bindAddress 067 * @throws JMSException 068 */ 069 public BrokerConnectorImpl(BrokerContainer container, String bindAddress, WireFormat wireFormat) throws JMSException { 070 this(container, createTransportServerChannel(wireFormat, bindAddress)); 071 } 072 073 /** 074 * @param container 075 * @param serverChannel 076 */ 077 public BrokerConnectorImpl(BrokerContainer container, TransportServerChannel serverChannel) { 078 assert container != null; 079 this.brokerInfo = new BrokerInfo(); 080 this.brokerInfo.setBrokerName(container.getBroker().getBrokerName()); 081 this.brokerInfo.setClusterName(container.getBroker().getBrokerClusterName()); 082 this.log = LogFactory.getLog(getClass().getName()); 083 this.serverChannel = serverChannel; 084 this.container = container; 085 this.container.addConnector(this); 086 serverChannel.setTransportChannelListener(this); 087 } 088 089 /** 090 * @return infomation about the Broker 091 */ 092 public BrokerInfo getBrokerInfo() { 093 return brokerInfo; 094 } 095 096 /** 097 * Get a hint about the broker capacity for more messages 098 * 099 * @return percentage value (0-100) about how much capacity the 100 * broker has 101 */ 102 public int getBrokerCapacity() { 103 return container.getBroker().getRoundedCapacity(); 104 } 105 106 /** 107 * @return Get the server channel 108 */ 109 public TransportServerChannel getServerChannel() { 110 return serverChannel; 111 } 112 113 /** 114 * start the Broker 115 * 116 * @throws JMSException 117 */ 118 public void start() throws JMSException { 119 this.serverChannel.start(); 120 log.info("ActiveMQ connector started: " + serverChannel); 121 } 122 123 /** 124 * Stop the Broker 125 * 126 * @throws JMSException 127 */ 128 public void stop() throws JMSException { 129 this.container.removeConnector(this); 130 this.serverChannel.stop(); 131 log.info("ActiveMQ connector stopped: " + serverChannel); 132 } 133 134 /** 135 * Register a Broker Client 136 * 137 * @param client 138 * @param info contains infomation about the Connection this Client represents 139 * @throws JMSException 140 * @throws javax.jms.InvalidClientIDException 141 * if the JMS client specifies an invalid or duplicate client ID. 142 * @throws JMSSecurityException if client authentication fails due to an invalid user name or password. 143 */ 144 public void registerClient(BrokerClient client, ConnectionInfo info) throws JMSException { 145 this.container.registerConnection(client, info); 146 } 147 148 /** 149 * Deregister a Broker Client 150 * 151 * @param client 152 * @param info 153 * @throws JMSException if some internal error occurs 154 */ 155 public void deregisterClient(BrokerClient client, ConnectionInfo info) throws JMSException { 156 this.container.deregisterConnection(client, info); 157 } 158 159 /** 160 * Registers a MessageConsumer 161 * 162 * @param client 163 * @param info 164 * @throws JMSException 165 * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for 166 */ 167 public void registerMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 168 if (info.getDestination() == null) { 169 throw new JMSException("No Destination specified on consumerInfo for client: " + client + " info: " + info); 170 } 171 this.container.registerMessageConsumer(client, info); 172 173 } 174 175 /** 176 * De-register a MessageConsumer from the Broker 177 * 178 * @param client 179 * @param info 180 * @throws JMSException 181 */ 182 public void deregisterMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 183 this.container.deregisterMessageConsumer(client, info); 184 } 185 186 /** 187 * Registers a MessageProducer 188 * 189 * @param client 190 * @param info 191 * @throws JMSException 192 * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for 193 */ 194 public void registerMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException { 195 this.container.registerMessageProducer(client, info); 196 } 197 198 /** 199 * De-register a MessageProducer from the Broker 200 * 201 * @param client 202 * @param info 203 * @throws JMSException 204 */ 205 public void deregisterMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException { 206 this.container.deregisterMessageProducer(client, info); 207 } 208 209 /** 210 * Register a client-side Session (used for Monitoring) 211 * 212 * @param client 213 * @param info 214 * @throws JMSException 215 */ 216 public void registerSession(BrokerClient client, SessionInfo info) throws JMSException { 217 this.container.registerSession(client, info); 218 } 219 220 /** 221 * De-register a client-side Session from the Broker (used for monitoring) 222 * 223 * @param client 224 * @param info 225 * @throws JMSException 226 */ 227 public void deregisterSession(BrokerClient client, SessionInfo info) throws JMSException { 228 this.container.deregisterSession(client, info); 229 } 230 231 /** 232 * Start a transaction from the Client session 233 * 234 * @param client 235 * @param transactionId 236 * @throws JMSException 237 */ 238 public void startTransaction(BrokerClient client, String transactionId) throws JMSException { 239 this.container.startTransaction(client, transactionId); 240 } 241 242 /** 243 * Rollback a transacton 244 * 245 * @param client 246 * @param transactionId 247 * @throws JMSException 248 */ 249 public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException { 250 this.container.rollbackTransaction(client, transactionId); 251 } 252 253 /** 254 * Commit a transaction 255 * 256 * @param client 257 * @param transactionId 258 * @throws JMSException 259 */ 260 public void commitTransaction(BrokerClient client, String transactionId) throws JMSException { 261 this.container.commitTransaction(client, transactionId); 262 } 263 264 /** 265 * Send a non-transacted message to the Broker 266 * 267 * @param client 268 * @param message 269 * @throws JMSException 270 */ 271 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException { 272 this.container.sendMessage(client, message); 273 } 274 275 /** 276 * Acknowledge reciept of a message 277 * 278 * @param client 279 * @param ack 280 * @throws JMSException 281 */ 282 public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException { 283 this.container.acknowledgeMessage(client, ack); 284 } 285 286 /** 287 * Command to delete a durable topic subscription 288 * 289 * @param client 290 * @param ds 291 * @throws JMSException 292 */ 293 public void durableUnsubscribe(BrokerClient client, DurableUnsubscribe ds) throws JMSException { 294 this.container.durableUnsubscribe(client, ds); 295 } 296 297 298 /** 299 * @param channel - client to add 300 */ 301 public void addClient(TransportChannel channel) { 302 try { 303 BrokerClient client = new BrokerClientImpl(); 304 client.initialize(this, channel); 305 if (log.isDebugEnabled()) { 306 log.debug("Starting new client: " + client); 307 } 308 channel.setServerSide(true); 309 channel.start(); 310 clients.put(channel, client); 311 } 312 catch (JMSException e) { 313 log.error("Failed to add client due to: " + e, e); 314 } 315 } 316 317 /** 318 * @param channel - client to remove 319 */ 320 public void removeClient(TransportChannel channel) { 321 BrokerClient client = (BrokerClient) clients.remove(channel); 322 if (client != null) { 323 if (log.isDebugEnabled()) { 324 log.debug("Client leaving client: " + client); 325 } 326 327 // we may have already been closed, if not then lets simulate a normal shutdown 328 client.cleanUp(); 329 } 330 else { 331 // might have got a duplicate callback 332 log.warn("No such client for channel: " + channel); 333 } 334 } 335 336 /** 337 * @return the BrokerContainer for this Connector 338 */ 339 public BrokerContainer getBrokerContainer() { 340 return this.container; 341 } 342 343 /** 344 * Start an XA transaction. 345 * 346 * @see org.activemq.broker.BrokerConnector#startTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid) 347 */ 348 public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException { 349 this.container.startTransaction(client, xid); 350 } 351 352 /** 353 * Gets the prepared XA transactions. 354 * 355 * @see org.activemq.broker.BrokerConnector#getPreparedTransactions(org.activemq.broker.BrokerClient) 356 */ 357 public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException { 358 return this.container.getPreparedTransactions(client); 359 } 360 361 /** 362 * Prepare an XA transaction. 363 * 364 * @see org.activemq.broker.BrokerConnector#prepareTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid) 365 */ 366 public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException { 367 return this.container.prepareTransaction(client, xid); 368 } 369 370 /** 371 * Rollback an XA transaction. 372 * 373 * @see org.activemq.broker.BrokerConnector#rollbackTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid) 374 */ 375 public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException { 376 this.container.rollbackTransaction(client, xid); 377 } 378 379 /** 380 * Commit an XA transaction. 381 * 382 * @see org.activemq.broker.BrokerConnector#commitTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid, boolean) 383 */ 384 public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException { 385 this.container.commitTransaction(client, xid, onePhase); 386 } 387 388 /** 389 * @see org.activemq.broker.BrokerConnector#getResourceManagerId(org.activemq.broker.BrokerClient) 390 */ 391 public String getResourceManagerId(BrokerClient client) { 392 // TODO: I think we need to return a better (more unique) RM id. 393 return getBrokerInfo().getBrokerName(); 394 } 395 396 397 // Implementation methods 398 //------------------------------------------------------------------------- 399 /** 400 * Factory method ot create a transport channel 401 * 402 * @param bindAddress 403 * @return @throws JMSException 404 * @throws JMSException 405 */ 406 protected static TransportServerChannel createTransportServerChannel(WireFormat wireFormat, String bindAddress) throws JMSException { 407 URI url; 408 try { 409 url = new URI(bindAddress); 410 } 411 catch (URISyntaxException e) { 412 JMSException jmsEx = new JMSException("Badly formated bindAddress: " + e.getMessage()); 413 jmsEx.setLinkedException(e); 414 throw jmsEx; 415 } 416 return TransportServerChannelProvider.create(wireFormat, url); 417 } 418 419 }