View Javadoc

1   /***
2    *
3    * Copyright 2004 Protique Ltd
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  
19  package org.codehaus.activemq.broker.impl;
20  
21  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
22  import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.codehaus.activemq.broker.Broker;
26  import org.codehaus.activemq.broker.BrokerClient;
27  import org.codehaus.activemq.broker.BrokerConnector;
28  import org.codehaus.activemq.broker.BrokerContainer;
29  import org.codehaus.activemq.capacity.CapacityMonitorEvent;
30  import org.codehaus.activemq.capacity.CapacityMonitorEventListener;
31  import org.codehaus.activemq.message.ActiveMQDestination;
32  import org.codehaus.activemq.message.ActiveMQMessage;
33  import org.codehaus.activemq.message.ActiveMQXid;
34  import org.codehaus.activemq.message.ConnectionInfo;
35  import org.codehaus.activemq.message.ConsumerInfo;
36  import org.codehaus.activemq.message.DurableUnsubscribe;
37  import org.codehaus.activemq.message.MessageAck;
38  import org.codehaus.activemq.message.ProducerInfo;
39  import org.codehaus.activemq.message.SessionInfo;
40  import org.codehaus.activemq.service.Service;
41  import org.codehaus.activemq.store.PersistenceAdapter;
42  
43  import javax.jms.InvalidClientIDException;
44  import javax.jms.InvalidDestinationException;
45  import javax.jms.JMSException;
46  import javax.jms.JMSSecurityException;
47  import javax.transaction.xa.XAException;
48  import java.util.ArrayList;
49  import java.util.Iterator;
50  import java.util.List;
51  import java.util.Map;
52  
53  /***
54   * Represents the ActiveMQ JMS Broker which typically has one or many connectors
55   *
56   * @version $Revision: 1.19 $
57   */
58  public class BrokerContainerImpl implements BrokerContainer, CapacityMonitorEventListener {
59      private static final Log log = LogFactory.getLog(BrokerContainerImpl.class);
60      private Broker broker;
61      private Map clientIds;
62      private Map consumerInfos;
63      private Map producerInfos;
64      private List connectors;
65      private Thread shutdownHook;
66      private boolean stopped;
67  
68      /***
69       * Default Constructor
70       *
71       * @param brokerName
72       */
73      public BrokerContainerImpl(String brokerName) {
74          this(new DefaultBroker(brokerName));
75      }
76  
77      public BrokerContainerImpl(String brokerName, PersistenceAdapter persistenceAdapter) {
78          this(new DefaultBroker(brokerName, persistenceAdapter));
79      }
80  
81      /***
82       * @param broker
83       */
84      public BrokerContainerImpl(Broker broker) {
85          this.broker = broker;
86          this.clientIds = new ConcurrentHashMap();
87          this.consumerInfos = new ConcurrentHashMap();
88          this.producerInfos = new ConcurrentHashMap();
89          this.connectors = new CopyOnWriteArrayList();
90          this.broker.addCapacityEventListener(this);
91      }
92  
93      public List getConnectors() {
94          return connectors;
95      }
96  
97      public void setConnectors(List connectors) {
98          this.connectors = connectors;
99      }
100 
101     /***
102      * @return the Broker for the Container
103      */
104     public Broker getBroker() {
105         return broker;
106     }
107 
108     public PersistenceAdapter getPersistenceAdapter() {
109         return broker != null ? broker.getPersistenceAdapter() : null;
110     }
111 
112     public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {
113         if (broker == null) {
114             throw new IllegalStateException("Cannot set this property as we don't have a broker yet");
115         }
116         broker.setPersistenceAdapter(persistenceAdapter);
117     }
118 
119     /***
120      * start the Container
121      *
122      * @throws JMSException
123      */
124     public void start() throws JMSException {
125         log.info("ActiveMQ JMS Message Broker is starting");
126         log.info("For help or more information please see: http://activemq.codehaus.org/");
127         broker.start();
128         addShutdownHook();
129         log.info("ActiveMQ JMS Message Broker has started");
130     }
131 
132     protected void addShutdownHook() {
133         shutdownHook = new Thread() {
134             public void run() {
135                 containerShutdown();
136             }
137         };
138         Runtime.getRuntime().addShutdownHook(shutdownHook);
139     }
140 
141     /***
142      * Stop the Container
143      *
144      * @throws JMSException
145      */
146     public synchronized void stop() throws JMSException {
147         if (!stopped) {
148             log.info("ActiveMQ Message Broker is shutting down");
149 
150             try {
151                 Runtime.getRuntime().removeShutdownHook(shutdownHook);
152             }
153             catch (Exception e) {
154                 // Ignore, must be shutting down
155             }
156 
157             JMSException firstException = null;
158 
159             // lets close all the connectors - copying the collection first
160             // TODO we might not need to copy the collection, as maybe the List might not
161             // throw concurrent modification exception? Couldn't tell from the docs
162             // but I don't think it does
163             for (Iterator iter = new ArrayList(connectors).iterator(); iter.hasNext();) {
164                 Service connector = (Service) iter.next();
165                 try {
166                     connector.stop();
167                 }
168                 catch (JMSException e) {
169                     if (firstException == null) {
170                         firstException = e;
171                     }
172                     log.warn("Could not close connector: " + connector + " due to: " + e, e);
173                 }
174             }
175             connectors.clear();
176 
177             // lets close all the channels
178             // note that this Map implementation does not throw concurrent modification exception
179             for (Iterator iter = clientIds.values().iterator(); iter.hasNext();) {
180                 // should remove clients from parent container?
181                 BrokerClient client = (BrokerClient) iter.next();
182                 try {
183                     client.stop();
184                 }
185                 catch (JMSException e) {
186                     if (firstException == null) {
187                         firstException = e;
188                     }
189                     log.warn("Could not close client: " + client + " due to: " + e, e);
190                 }
191             }
192             clientIds.clear();
193 
194             broker.removeCapacityEventListener(this);
195             broker.stop();
196 
197             log.info("ActiveMQ JMS Message Broker stopped");
198 
199             stopped = true;
200             if (firstException != null) {
201                 throw firstException;
202             }
203         }
204     }
205 
206     /***
207      * registers a new Connection
208      *
209      * @param client
210      * @param info   infomation about the client-side Connection
211      * @throws InvalidClientIDException if the ClientID of the Connection is a duplicate
212      */
213     public void registerConnection(BrokerClient client, ConnectionInfo info) throws InvalidClientIDException {
214         String clientId = info.getClientId();
215         if (clientIds.containsKey(clientId)) {
216             throw new InvalidClientIDException("Duplicate clientId: " + info);
217         }
218         log.info("Adding new client: " + clientId + " on transport: " + client.getChannel());
219         clientIds.put(clientId, client);
220     }
221 
222     /***
223      * un-registers a Connection
224      *
225      * @param client
226      * @param info   infomation about the client-side Connection
227      * @throws JMSException
228      */
229     public void deregisterConnection(BrokerClient client, ConnectionInfo info) throws JMSException {
230         String clientId = client.getClientID();
231         if (clientId != null) {
232             Object answer = clientIds.remove(clientId);
233             if (answer != null) {
234                 log.info("Removing client: " + clientId + " on transport: " + client.getChannel());
235                 getBroker().cleanUpClient(client);
236             }
237             else {
238                 log.warn("Got duplicate deregisterConnection for client: " + clientId);
239             }
240         }
241         else {
242             log.warn("No clientID available for client: " + client);
243         }
244     }
245 
246     /***
247      * Registers a MessageConsumer
248      *
249      * @param client
250      * @param info
251      * @throws JMSException
252      * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
253      */
254     public void registerMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
255         consumerInfos.put(info, client);
256         getBroker().addMessageConsumer(client, info);
257     }
258 
259     /***
260      * De-register a MessageConsumer from the Broker
261      *
262      * @param client
263      * @param info
264      * @throws JMSException
265      */
266     public void deregisterMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
267         consumerInfos.remove(info);
268         getBroker().removeMessageConsumer(client, info);
269     }
270 
271     /***
272      * Registers a MessageProducer
273      *
274      * @param client
275      * @param info
276      * @throws JMSException
277      * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
278      */
279     public void registerMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
280         ActiveMQDestination dest = info.getDestination();
281         if (dest != null && dest.isTemporary()) {
282             //check to see if the client that is the target for the temporary destination still exists
283             String clientId = ActiveMQDestination.getClientId(dest);
284             if (clientId == null) {
285                 throw new InvalidDestinationException("Destination " + dest.getPhysicalName()
286                         + " is a temporary destination with null clientId");
287             }
288             if (!clientIds.containsKey(clientId)) {
289                 throw new InvalidDestinationException("Destination " + dest.getPhysicalName()
290                         + " is no longer valid because the client " + clientId + " no longer exists");
291             }
292         }
293         producerInfos.put(info, client);
294     }
295 
296     /***
297      * De-register a MessageProducer from the Broker
298      *
299      * @param client
300      * @param info
301      * @throws JMSException
302      */
303     public void deregisterMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
304         producerInfos.remove(info);
305     }
306 
307     /***
308      * Register a client-side Session (used for Monitoring)
309      *
310      * @param client
311      * @param info
312      * @throws JMSException
313      */
314     public void registerSession(BrokerClient client, SessionInfo info) throws JMSException {
315     }
316 
317     /***
318      * De-register a client-side Session from the Broker (used for monitoring)
319      *
320      * @param client
321      * @param info
322      * @throws JMSException
323      */
324     public void deregisterSession(BrokerClient client, SessionInfo info) throws JMSException {
325     }
326 
327     /***
328      * Start a transaction from the Client session
329      *
330      * @param client
331      * @param transactionId
332      * @throws JMSException
333      */
334     public void startTransaction(BrokerClient client, String transactionId) throws JMSException {
335         getBroker().startTransaction(client, transactionId);
336     }
337 
338     /***
339      * Rollback a transacton
340      *
341      * @param client
342      * @param transactionId
343      * @throws JMSException
344      */
345     public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException {
346         getBroker().rollbackTransaction(client, transactionId);
347     }
348 
349     /***
350      * Commit a transaction
351      *
352      * @param client
353      * @param transactionId
354      * @throws JMSException
355      */
356     public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
357         getBroker().commitTransaction(client, transactionId);
358     }
359 
360     /***
361      * send message with a transaction context
362      *
363      * @param client
364      * @param transactionId
365      * @param message
366      * @throws JMSException
367      */
368     public void sendTransactedMessage(BrokerClient client, String transactionId, ActiveMQMessage message)
369             throws JMSException {
370         getBroker().sendTransactedMessage(client, transactionId, message);
371     }
372 
373     /***
374      * Acknowledge receipt of a message within a transaction context
375      *
376      * @param client
377      * @param transactionId
378      * @param ack
379      * @throws JMSException
380      */
381     public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack)
382             throws JMSException {
383         getBroker().acknowledgeTransactedMessage(client, transactionId, ack);
384     }
385 
386     /***
387      * Send a non-transacted message to the Broker
388      *
389      * @param client
390      * @param message
391      * @throws JMSException
392      */
393     public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
394         getBroker().sendMessage(client, message);
395     }
396 
397     /***
398      * Acknowledge reciept of a message
399      *
400      * @param client
401      * @param ack
402      * @throws JMSException
403      */
404     public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
405         getBroker().acknowledgeMessage(client, ack);
406     }
407 
408     /***
409      * Command to delete a durable topic subscription
410      *
411      * @param client
412      * @param ds
413      * @throws JMSException
414      */
415     public void durableUnsubscribe(BrokerClient client, DurableUnsubscribe ds) throws JMSException {
416         getBroker().deleteSubscription(ds.getClientId(), ds.getSubscriberName());
417     }
418 
419     /***
420      * Start an XA transaction.
421      *
422      * @param client
423      * @param xid
424      */
425     public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
426         getBroker().startTransaction(client, xid);
427     }
428 
429     /***
430      * Gets the prepared XA transactions.
431      *
432      * @param client
433      * @return
434      */
435     public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException {
436         return getBroker().getPreparedTransactions(client);
437     }
438 
439     /***
440      * Prepare an XA transaction.
441      *
442      * @param client
443      * @param xid
444      */
445     public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
446         return getBroker().prepareTransaction(client, xid);
447     }
448 
449     /***
450      * Rollback an XA transaction.
451      *
452      * @param client
453      * @param xid
454      */
455     public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
456         getBroker().rollbackTransaction(client, xid);
457     }
458 
459     /***
460      * Commit an XA transaction.
461      *
462      * @param client
463      * @param xid
464      * @param onePhase
465      */
466     public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException {
467         getBroker().commitTransaction(client, xid, onePhase);
468     }
469 
470     public void addConnector(BrokerConnector connector) {
471         connectors.add(connector);
472     }
473 
474     public void removeConnector(BrokerConnector connector) {
475         connectors.remove(connector);
476     }
477 
478     /***
479      * Update any message producers about our capacity to handle messages
480      *
481      * @param event
482      */
483     public void capacityChanged(CapacityMonitorEvent event) {
484         //only send to producers
485         for (Iterator i = producerInfos.values().iterator(); i.hasNext();) {
486             BrokerClient client = (BrokerClient) i.next();
487             client.updateBrokerCapacity(event.getCapacity());
488         }
489     }
490 
491     /***
492      * Causes a clean shutdown of the container when the VM is being shut down
493      */
494     protected void containerShutdown() {
495         try {
496             stop();
497         }
498         catch (JMSException e) {
499             Exception linkedException = e.getLinkedException();
500             if (linkedException != null) {
501                 log.error("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
502             }
503             else {
504                 log.error("Failed to shut down: " + e, e);
505             }
506         }
507         catch (Exception e) {
508             log.error("Failed to shut down: " + e, e);
509         }
510     }
511 }