1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.broker.impl;
20
21 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
22 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.codehaus.activemq.broker.Broker;
26 import org.codehaus.activemq.broker.BrokerClient;
27 import org.codehaus.activemq.broker.BrokerConnector;
28 import org.codehaus.activemq.broker.BrokerContainer;
29 import org.codehaus.activemq.capacity.CapacityMonitorEvent;
30 import org.codehaus.activemq.capacity.CapacityMonitorEventListener;
31 import org.codehaus.activemq.message.ActiveMQDestination;
32 import org.codehaus.activemq.message.ActiveMQMessage;
33 import org.codehaus.activemq.message.ActiveMQXid;
34 import org.codehaus.activemq.message.ConnectionInfo;
35 import org.codehaus.activemq.message.ConsumerInfo;
36 import org.codehaus.activemq.message.DurableUnsubscribe;
37 import org.codehaus.activemq.message.MessageAck;
38 import org.codehaus.activemq.message.ProducerInfo;
39 import org.codehaus.activemq.message.SessionInfo;
40 import org.codehaus.activemq.service.Service;
41 import org.codehaus.activemq.store.PersistenceAdapter;
42
43 import javax.jms.InvalidClientIDException;
44 import javax.jms.InvalidDestinationException;
45 import javax.jms.JMSException;
46 import javax.jms.JMSSecurityException;
47 import javax.transaction.xa.XAException;
48 import java.util.ArrayList;
49 import java.util.Iterator;
50 import java.util.List;
51 import java.util.Map;
52
53 /***
54 * Represents the ActiveMQ JMS Broker which typically has one or many connectors
55 *
56 * @version $Revision: 1.19 $
57 */
58 public class BrokerContainerImpl implements BrokerContainer, CapacityMonitorEventListener {
59 private static final Log log = LogFactory.getLog(BrokerContainerImpl.class);
60 private Broker broker;
61 private Map clientIds;
62 private Map consumerInfos;
63 private Map producerInfos;
64 private List connectors;
65 private Thread shutdownHook;
66 private boolean stopped;
67
68 /***
69 * Default Constructor
70 *
71 * @param brokerName
72 */
73 public BrokerContainerImpl(String brokerName) {
74 this(new DefaultBroker(brokerName));
75 }
76
77 public BrokerContainerImpl(String brokerName, PersistenceAdapter persistenceAdapter) {
78 this(new DefaultBroker(brokerName, persistenceAdapter));
79 }
80
81 /***
82 * @param broker
83 */
84 public BrokerContainerImpl(Broker broker) {
85 this.broker = broker;
86 this.clientIds = new ConcurrentHashMap();
87 this.consumerInfos = new ConcurrentHashMap();
88 this.producerInfos = new ConcurrentHashMap();
89 this.connectors = new CopyOnWriteArrayList();
90 this.broker.addCapacityEventListener(this);
91 }
92
93 public List getConnectors() {
94 return connectors;
95 }
96
97 public void setConnectors(List connectors) {
98 this.connectors = connectors;
99 }
100
101 /***
102 * @return the Broker for the Container
103 */
104 public Broker getBroker() {
105 return broker;
106 }
107
108 public PersistenceAdapter getPersistenceAdapter() {
109 return broker != null ? broker.getPersistenceAdapter() : null;
110 }
111
112 public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {
113 if (broker == null) {
114 throw new IllegalStateException("Cannot set this property as we don't have a broker yet");
115 }
116 broker.setPersistenceAdapter(persistenceAdapter);
117 }
118
119 /***
120 * start the Container
121 *
122 * @throws JMSException
123 */
124 public void start() throws JMSException {
125 log.info("ActiveMQ JMS Message Broker is starting");
126 log.info("For help or more information please see: http://activemq.codehaus.org/");
127 broker.start();
128 addShutdownHook();
129 log.info("ActiveMQ JMS Message Broker has started");
130 }
131
132 protected void addShutdownHook() {
133 shutdownHook = new Thread() {
134 public void run() {
135 containerShutdown();
136 }
137 };
138 Runtime.getRuntime().addShutdownHook(shutdownHook);
139 }
140
141 /***
142 * Stop the Container
143 *
144 * @throws JMSException
145 */
146 public synchronized void stop() throws JMSException {
147 if (!stopped) {
148 log.info("ActiveMQ Message Broker is shutting down");
149
150 try {
151 Runtime.getRuntime().removeShutdownHook(shutdownHook);
152 }
153 catch (Exception e) {
154
155 }
156
157 JMSException firstException = null;
158
159
160
161
162
163 for (Iterator iter = new ArrayList(connectors).iterator(); iter.hasNext();) {
164 Service connector = (Service) iter.next();
165 try {
166 connector.stop();
167 }
168 catch (JMSException e) {
169 if (firstException == null) {
170 firstException = e;
171 }
172 log.warn("Could not close connector: " + connector + " due to: " + e, e);
173 }
174 }
175 connectors.clear();
176
177
178
179 for (Iterator iter = clientIds.values().iterator(); iter.hasNext();) {
180
181 BrokerClient client = (BrokerClient) iter.next();
182 try {
183 client.stop();
184 }
185 catch (JMSException e) {
186 if (firstException == null) {
187 firstException = e;
188 }
189 log.warn("Could not close client: " + client + " due to: " + e, e);
190 }
191 }
192 clientIds.clear();
193
194 broker.removeCapacityEventListener(this);
195 broker.stop();
196
197 log.info("ActiveMQ JMS Message Broker stopped");
198
199 stopped = true;
200 if (firstException != null) {
201 throw firstException;
202 }
203 }
204 }
205
206 /***
207 * registers a new Connection
208 *
209 * @param client
210 * @param info infomation about the client-side Connection
211 * @throws InvalidClientIDException if the ClientID of the Connection is a duplicate
212 */
213 public void registerConnection(BrokerClient client, ConnectionInfo info) throws InvalidClientIDException {
214 String clientId = info.getClientId();
215 if (clientIds.containsKey(clientId)) {
216 throw new InvalidClientIDException("Duplicate clientId: " + info);
217 }
218 log.info("Adding new client: " + clientId + " on transport: " + client.getChannel());
219 clientIds.put(clientId, client);
220 }
221
222 /***
223 * un-registers a Connection
224 *
225 * @param client
226 * @param info infomation about the client-side Connection
227 * @throws JMSException
228 */
229 public void deregisterConnection(BrokerClient client, ConnectionInfo info) throws JMSException {
230 String clientId = client.getClientID();
231 if (clientId != null) {
232 Object answer = clientIds.remove(clientId);
233 if (answer != null) {
234 log.info("Removing client: " + clientId + " on transport: " + client.getChannel());
235 getBroker().cleanUpClient(client);
236 }
237 else {
238 log.warn("Got duplicate deregisterConnection for client: " + clientId);
239 }
240 }
241 else {
242 log.warn("No clientID available for client: " + client);
243 }
244 }
245
246 /***
247 * Registers a MessageConsumer
248 *
249 * @param client
250 * @param info
251 * @throws JMSException
252 * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
253 */
254 public void registerMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
255 consumerInfos.put(info, client);
256 getBroker().addMessageConsumer(client, info);
257 }
258
259 /***
260 * De-register a MessageConsumer from the Broker
261 *
262 * @param client
263 * @param info
264 * @throws JMSException
265 */
266 public void deregisterMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
267 consumerInfos.remove(info);
268 getBroker().removeMessageConsumer(client, info);
269 }
270
271 /***
272 * Registers a MessageProducer
273 *
274 * @param client
275 * @param info
276 * @throws JMSException
277 * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
278 */
279 public void registerMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
280 ActiveMQDestination dest = info.getDestination();
281 if (dest != null && dest.isTemporary()) {
282
283 String clientId = ActiveMQDestination.getClientId(dest);
284 if (clientId == null) {
285 throw new InvalidDestinationException("Destination " + dest.getPhysicalName()
286 + " is a temporary destination with null clientId");
287 }
288 if (!clientIds.containsKey(clientId)) {
289 throw new InvalidDestinationException("Destination " + dest.getPhysicalName()
290 + " is no longer valid because the client " + clientId + " no longer exists");
291 }
292 }
293 producerInfos.put(info, client);
294 }
295
296 /***
297 * De-register a MessageProducer from the Broker
298 *
299 * @param client
300 * @param info
301 * @throws JMSException
302 */
303 public void deregisterMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
304 producerInfos.remove(info);
305 }
306
307 /***
308 * Register a client-side Session (used for Monitoring)
309 *
310 * @param client
311 * @param info
312 * @throws JMSException
313 */
314 public void registerSession(BrokerClient client, SessionInfo info) throws JMSException {
315 }
316
317 /***
318 * De-register a client-side Session from the Broker (used for monitoring)
319 *
320 * @param client
321 * @param info
322 * @throws JMSException
323 */
324 public void deregisterSession(BrokerClient client, SessionInfo info) throws JMSException {
325 }
326
327 /***
328 * Start a transaction from the Client session
329 *
330 * @param client
331 * @param transactionId
332 * @throws JMSException
333 */
334 public void startTransaction(BrokerClient client, String transactionId) throws JMSException {
335 getBroker().startTransaction(client, transactionId);
336 }
337
338 /***
339 * Rollback a transacton
340 *
341 * @param client
342 * @param transactionId
343 * @throws JMSException
344 */
345 public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException {
346 getBroker().rollbackTransaction(client, transactionId);
347 }
348
349 /***
350 * Commit a transaction
351 *
352 * @param client
353 * @param transactionId
354 * @throws JMSException
355 */
356 public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
357 getBroker().commitTransaction(client, transactionId);
358 }
359
360 /***
361 * send message with a transaction context
362 *
363 * @param client
364 * @param transactionId
365 * @param message
366 * @throws JMSException
367 */
368 public void sendTransactedMessage(BrokerClient client, String transactionId, ActiveMQMessage message)
369 throws JMSException {
370 getBroker().sendTransactedMessage(client, transactionId, message);
371 }
372
373 /***
374 * Acknowledge receipt of a message within a transaction context
375 *
376 * @param client
377 * @param transactionId
378 * @param ack
379 * @throws JMSException
380 */
381 public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack)
382 throws JMSException {
383 getBroker().acknowledgeTransactedMessage(client, transactionId, ack);
384 }
385
386 /***
387 * Send a non-transacted message to the Broker
388 *
389 * @param client
390 * @param message
391 * @throws JMSException
392 */
393 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
394 getBroker().sendMessage(client, message);
395 }
396
397 /***
398 * Acknowledge reciept of a message
399 *
400 * @param client
401 * @param ack
402 * @throws JMSException
403 */
404 public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
405 getBroker().acknowledgeMessage(client, ack);
406 }
407
408 /***
409 * Command to delete a durable topic subscription
410 *
411 * @param client
412 * @param ds
413 * @throws JMSException
414 */
415 public void durableUnsubscribe(BrokerClient client, DurableUnsubscribe ds) throws JMSException {
416 getBroker().deleteSubscription(ds.getClientId(), ds.getSubscriberName());
417 }
418
419 /***
420 * Start an XA transaction.
421 *
422 * @param client
423 * @param xid
424 */
425 public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
426 getBroker().startTransaction(client, xid);
427 }
428
429 /***
430 * Gets the prepared XA transactions.
431 *
432 * @param client
433 * @return
434 */
435 public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException {
436 return getBroker().getPreparedTransactions(client);
437 }
438
439 /***
440 * Prepare an XA transaction.
441 *
442 * @param client
443 * @param xid
444 */
445 public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
446 return getBroker().prepareTransaction(client, xid);
447 }
448
449 /***
450 * Rollback an XA transaction.
451 *
452 * @param client
453 * @param xid
454 */
455 public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
456 getBroker().rollbackTransaction(client, xid);
457 }
458
459 /***
460 * Commit an XA transaction.
461 *
462 * @param client
463 * @param xid
464 * @param onePhase
465 */
466 public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException {
467 getBroker().commitTransaction(client, xid, onePhase);
468 }
469
470 public void addConnector(BrokerConnector connector) {
471 connectors.add(connector);
472 }
473
474 public void removeConnector(BrokerConnector connector) {
475 connectors.remove(connector);
476 }
477
478 /***
479 * Update any message producers about our capacity to handle messages
480 *
481 * @param event
482 */
483 public void capacityChanged(CapacityMonitorEvent event) {
484
485 for (Iterator i = producerInfos.values().iterator(); i.hasNext();) {
486 BrokerClient client = (BrokerClient) i.next();
487 client.updateBrokerCapacity(event.getCapacity());
488 }
489 }
490
491 /***
492 * Causes a clean shutdown of the container when the VM is being shut down
493 */
494 protected void containerShutdown() {
495 try {
496 stop();
497 }
498 catch (JMSException e) {
499 Exception linkedException = e.getLinkedException();
500 if (linkedException != null) {
501 log.error("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
502 }
503 else {
504 log.error("Failed to shut down: " + e, e);
505 }
506 }
507 catch (Exception e) {
508 log.error("Failed to shut down: " + e, e);
509 }
510 }
511 }