1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.broker.BrokerConnector;
23 import org.codehaus.activemq.broker.BrokerContainer;
24 import org.codehaus.activemq.broker.BrokerContainerFactory;
25 import org.codehaus.activemq.broker.impl.BrokerClientImpl;
26 import org.codehaus.activemq.broker.impl.BrokerConnectorImpl;
27 import org.codehaus.activemq.broker.impl.BrokerContainerFactoryImpl;
28 import org.codehaus.activemq.jndi.JNDIBaseStorable;
29 import org.codehaus.activemq.management.JMSStatsImpl;
30 import org.codehaus.activemq.management.StatsCapable;
31 import org.codehaus.activemq.message.ActiveMQQueue;
32 import org.codehaus.activemq.message.ActiveMQTopic;
33 import org.codehaus.activemq.message.ConnectionInfo;
34 import org.codehaus.activemq.message.ConsumerInfo;
35 import org.codehaus.activemq.message.DefaultWireFormat;
36 import org.codehaus.activemq.message.WireFormat;
37 import org.codehaus.activemq.service.Service;
38 import org.codehaus.activemq.transport.TransportChannel;
39 import org.codehaus.activemq.transport.TransportChannelFactory;
40 import org.codehaus.activemq.transport.TransportChannelListener;
41 import org.codehaus.activemq.transport.TransportChannelProvider;
42 import org.codehaus.activemq.transport.vm.VmTransportChannel;
43 import org.codehaus.activemq.util.IdGenerator;
44
45 import javax.jms.Connection;
46 import javax.jms.ConnectionFactory;
47 import javax.jms.JMSException;
48 import javax.jms.QueueConnection;
49 import javax.jms.QueueConnectionFactory;
50 import javax.jms.TopicConnection;
51 import javax.jms.TopicConnectionFactory;
52 import javax.management.j2ee.statistics.Stats;
53 import java.net.URI;
54 import java.net.URISyntaxException;
55 import java.util.ArrayList;
56 import java.util.HashMap;
57 import java.util.Iterator;
58 import java.util.List;
59 import java.util.Map;
60 import java.util.Properties;
61
62 /***
63 * A ConnectionFactory is an an Administed object, and is used for creating
64 * Connections.
65 * <p/>
66 * This class also implements QueueConnectionFactory and TopicConnectionFactory and is an Administered object.
67 * You can use this connection to create both QueueConnections and TopicConnections.
68 *
69 * @version $Revision: 1.29 $
70 * @see javax.jms.ConnectionFactory
71 */
72 public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, Service, StatsCapable {
73
74 private static final Log log = LogFactory.getLog(ActiveMQConnectionFactory.class);
75
76 protected static BrokerContainer container;
77 private static Map brokers = new HashMap();
78
79 protected String userName;
80 protected String password;
81 protected String brokerURL;
82 protected String clientID;
83 private boolean useEmbeddedBroker;
84
85 private List startedEmbeddedBrokers = new ArrayList();
86
87 private JMSStatsImpl stats = new JMSStatsImpl();
88 private WireFormat wireFormat = new DefaultWireFormat();
89 private IdGenerator idGenerator = new IdGenerator();
90 private int connectionCount;
91 private BrokerContainerFactory brokerContainerFactory;
92
93 /***
94 * Default Constructor for ActiveMQConnectionFactory
95 */
96 public ActiveMQConnectionFactory() {
97 this.userName = ActiveMQConnection.DEFAULT_USER;
98 this.password = ActiveMQConnection.DEFAULT_PASSWORD;
99 this.brokerURL = ActiveMQConnection.DEFAULT_URL;
100 }
101
102 public ActiveMQConnectionFactory(String brokerURL) {
103 this();
104 this.brokerURL = brokerURL;
105 }
106
107 public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
108 this.userName = userName;
109 this.password = password;
110 this.brokerURL = brokerURL;
111 }
112
113 public Stats getStats() {
114 return stats;
115 }
116
117 public JMSStatsImpl getFactoryStats() {
118 return stats;
119 }
120
121 /***
122 * @return Returns the brokerURL.
123 */
124 public String getBrokerURL() {
125 return brokerURL;
126 }
127
128 /***
129 * @param brokerURL The brokerURL to set.
130 */
131 public void setBrokerURL(String brokerURL) {
132 this.brokerURL = brokerURL;
133 }
134
135 /***
136 * @return Returns the clientID.
137 */
138 public String getClientID() {
139 return clientID;
140 }
141
142 /***
143 * @param clientID The clientID to set.
144 */
145 public void setClientID(String clientID) {
146 this.clientID = clientID;
147 }
148
149 /***
150 * @return Returns the password.
151 */
152 public String getPassword() {
153 return password;
154 }
155
156 /***
157 * @param password The password to set.
158 */
159 public void setPassword(String password) {
160 this.password = password;
161 }
162
163 /***
164 * @return Returns the userName.
165 */
166 public String getUserName() {
167 return userName;
168 }
169
170 /***
171 * @param userName The userName to set.
172 */
173 public void setUserName(String userName) {
174 this.userName = userName;
175 }
176
177 /***
178 * Is an embedded broker used by this connection factory
179 *
180 * @return true if an embedded broker will be used by this connection factory
181 */
182 public boolean isUseEmbeddedBroker() {
183 return useEmbeddedBroker;
184 }
185
186 /***
187 * Allows embedded brokers to be associated with a connection factory
188 *
189 * @param useEmbeddedBroker
190 */
191 public void setUseEmbeddedBroker(boolean useEmbeddedBroker) {
192 this.useEmbeddedBroker = useEmbeddedBroker;
193 }
194
195 public WireFormat getWireFormat() {
196 return wireFormat;
197 }
198
199 /***
200 * Allows a custom wire format to be used; otherwise the default Java wire format is used
201 * which is designed for minimum size and maximum speed on the Java platform
202 *
203 * @param wireFormat
204 */
205 public void setWireFormat(WireFormat wireFormat) {
206 this.wireFormat = wireFormat;
207 }
208
209 public BrokerContainerFactory getBrokerContainerFactory() {
210 if (brokerContainerFactory == null) {
211 brokerContainerFactory = new BrokerContainerFactoryImpl();
212 }
213 return brokerContainerFactory;
214 }
215
216 public void setBrokerContainerFactory(BrokerContainerFactory brokerContainerFactory) {
217 this.brokerContainerFactory = brokerContainerFactory;
218 }
219
220 /***
221 * Set the properties that will represent the instance in JNDI
222 *
223 * @param props
224 */
225 protected void buildFromProperties(Properties props) {
226 this.userName = props.getProperty("userName", this.userName);
227 this.password = props.getProperty("password", this.password);
228 this.brokerURL = props.getProperty("brokerURL", this.brokerURL);
229 this.clientID = props.getProperty("clientID");
230 this.useEmbeddedBroker = getBoolean(props, "useEmbeddedBroker");
231 }
232
233 /***
234 * Initialize the instance from properties stored in JNDI
235 *
236 * @param props
237 */
238 protected void populateProperties(Properties props) {
239 props.put("userName", this.userName);
240 props.put("password", this.password);
241 props.put("brokerURL", this.brokerURL);
242 if (this.clientID != null) {
243 props.put("clientID", this.clientID);
244 }
245 props.put("useEmbeddedBroker", (useEmbeddedBroker) ? "true" : "false");
246 }
247
248 /***
249 * Helper method to return the property value as a boolean flag
250 */
251 protected boolean getBoolean(Properties props, String key) {
252 String value = props.getProperty(key);
253 return value != null && value.equalsIgnoreCase("true");
254 }
255
256 /***
257 * Create a JMS Connection
258 *
259 * @return the JMS Connection
260 * @throws JMSException if an error occurs creating the Connection
261 */
262 public Connection createConnection() throws JMSException {
263 return this.createConnection(this.userName, this.password);
264 }
265
266 /***
267 * @param userName
268 * @param password
269 * @return the Connection
270 * @throws JMSException if an error occurs creating the Connection
271 */
272 public Connection createConnection(String userName, String password) throws JMSException {
273 ActiveMQConnection connection = new ActiveMQConnection(this, userName, password, createTransportChannel(this.brokerURL));
274 if (this.clientID != null && this.clientID.length() > 0) {
275 connection.setClientID(this.clientID);
276 }
277 return connection;
278 }
279
280 /***
281 * Create a JMS QueueConnection
282 *
283 * @return the JMS QueueConnection
284 * @throws JMSException if an error occurs creating the Connection
285 */
286 public QueueConnection createQueueConnection() throws JMSException {
287 return this.createQueueConnection(this.userName, this.password);
288 }
289
290 /***
291 * @param userName
292 * @param password
293 * @return the QueueConnection
294 * @throws JMSException if an error occurs creating the Connection
295 */
296 public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
297 return (QueueConnection) createConnection(userName, password);
298 }
299
300 /***
301 * Create a JMS TopicConnection
302 *
303 * @return the JMS TopicConnection
304 * @throws JMSException if an error occurs creating the Connection
305 */
306 public TopicConnection createTopicConnection() throws JMSException {
307 return this.createTopicConnection(this.userName, this.password);
308 }
309
310 /***
311 * @param userName
312 * @param password
313 * @return the TopicConnection
314 * @throws JMSException if an error occurs creating the Connection
315 */
316 public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
317 return (TopicConnection) createConnection(userName, password);
318 }
319
320
321 public void start() throws JMSException {
322 }
323
324 /***
325 * A hook to allow any embedded JMS Broker's to be closed down
326 *
327 * @throws JMSException
328 */
329 public synchronized void stop() throws JMSException {
330
331 for (Iterator iter = startedEmbeddedBrokers.iterator(); iter.hasNext();) {
332 String uri = (String) iter.next();
333 unregisterBroker(uri);
334 }
335 if (container != null) {
336 container.stop();
337 container = null;
338 }
339 }
340
341 /***
342 * Factory method to create a TransportChannel from a URL
343 */
344 protected TransportChannel createTransportChannel(String theURLString) throws JMSException {
345 URI uri = createURI(theURLString);
346
347 TransportChannelFactory factory =
348 TransportChannelProvider.getFactory(uri);
349
350 BrokerConnector brokerConnector = null;
351 boolean created = false;
352 boolean embedServer = isUseEmbeddedBroker() || factory.requiresEmbeddedBroker();
353 if (embedServer) {
354 synchronized (this) {
355 brokerConnector = (BrokerConnector) brokers.get(theURLString);
356 if (brokerConnector == null) {
357 brokerConnector = createBrokerConnector(theURLString);
358 registerBroker(theURLString, brokerConnector);
359 startedEmbeddedBrokers.add(theURLString);
360 created = true;
361 }
362 }
363 }
364 TransportChannel transportChannel = factory.create(getWireFormat(), uri);
365 if (embedServer) {
366 return ensureServerIsAvailable(uri, transportChannel, brokerConnector, created);
367 }
368 return transportChannel;
369 }
370
371 public static synchronized void registerBroker(String theURLString, BrokerConnector brokerConnector) {
372 brokers.put(theURLString, brokerConnector);
373 }
374
375 public static synchronized void unregisterBroker(String theURLString) {
376 brokers.remove(theURLString);
377 }
378
379 protected synchronized BrokerContainer getContainer(String brokerName) throws JMSException {
380 if (container == null) {
381 container = getBrokerContainerFactory().createBrokerContainer(brokerName);
382 container.start();
383 }
384 return container;
385 }
386
387 protected BrokerConnector createBrokerConnector(String url) throws JMSException {
388 BrokerConnector brokerConnector;
389 brokerConnector = new BrokerConnectorImpl(getContainer(url), url, getWireFormat());
390 brokerConnector.start();
391
392
393 log.info("Embedded JMS Broker has started");
394 try {
395 Thread.sleep(1000);
396 }
397 catch (InterruptedException e) {
398 System.out.println("Caught: " + e);
399 e.printStackTrace();
400 }
401 return brokerConnector;
402 }
403
404
405 protected TransportChannel ensureServerIsAvailable(URI remoteLocation, TransportChannel channel, BrokerConnector brokerConnector, boolean created) throws JMSException {
406 ensureVmServerIsAvailable(channel, brokerConnector);
407 if (channel.isMulticast()) {
408 return ensureMulticastChannelIsAvailable(remoteLocation, channel, brokerConnector, created);
409 }
410 return channel;
411 }
412
413 private void ensureVmServerIsAvailable(TransportChannel channel, BrokerConnector brokerConnector) throws JMSException {
414 if (channel instanceof VmTransportChannel && brokerConnector instanceof TransportChannelListener) {
415 TransportChannelListener listener = (TransportChannelListener) brokerConnector;
416 VmTransportChannel answer = (VmTransportChannel) channel;
417
418 listener.addClient(answer.createServerSide());
419 }
420 }
421
422 protected TransportChannel ensureMulticastChannelIsAvailable(URI remoteLocation, TransportChannel channel, BrokerConnector brokerConnector, boolean created) throws JMSException {
423 if (created) {
424 BrokerConnectorImpl brokerImpl = (BrokerConnectorImpl) brokerConnector;
425
426 BrokerClientImpl client = new BrokerClientImpl();
427 client.initialize(brokerImpl, channel);
428 channel.start();
429 String brokerClientID = createMulticastClientID();
430 channel.setClientID(brokerClientID);
431
432
433
434 ConnectionInfo info = new ConnectionInfo();
435 info.setHostName(IdGenerator.getHostName());
436 info.setClientId(brokerClientID);
437 info.setStarted(true);
438 client.consumeConnectionInfo(info);
439
440 ConsumerInfo consumerInfo = new ConsumerInfo();
441 consumerInfo.setDestination(new ActiveMQTopic(">"));
442 consumerInfo.setNoLocal(true);
443 consumerInfo.setClientId(brokerClientID);
444 consumerInfo.setConsumerId(idGenerator.generateId());
445 consumerInfo.setId(consumerInfo.getConsumerId());
446 consumerInfo.setStarted(true);
447 client.consumeConsumerInfo(consumerInfo);
448
449 consumerInfo = new ConsumerInfo();
450 consumerInfo.setDestination(new ActiveMQQueue(">"));
451 consumerInfo.setNoLocal(true);
452 consumerInfo.setClientId(brokerClientID);
453 consumerInfo.setConsumerId(idGenerator.generateId());
454 consumerInfo.setId(consumerInfo.getConsumerId());
455 consumerInfo.setStarted(true);
456 client.consumeConsumerInfo(consumerInfo);
457 }
458
459
460
461 URI localURI = createURI("vm", remoteLocation);
462 TransportChannel localChannel = TransportChannelProvider.create(getWireFormat(), localURI);
463 ensureVmServerIsAvailable(localChannel, brokerConnector);
464 return localChannel;
465 }
466
467 /***
468 * Creates the clientID for the multicast client (used to dispatch local
469 * messages over a multicast bus)
470 */
471 protected String createMulticastClientID() {
472 return idGenerator.generateId();
473 }
474
475 protected URI createURI(String protocol, URI uri) throws JMSException {
476 try {
477 return new URI(protocol, uri.getRawSchemeSpecificPart(), uri.getFragment());
478 }
479 catch (URISyntaxException e) {
480 JMSException jmsEx = new JMSException("the URL string is badly formated:", e.getMessage());
481 jmsEx.setLinkedException(e);
482 throw jmsEx;
483
484 }
485 }
486
487 protected URI createURI(String uri) throws JMSException {
488 try {
489 if (uri == null) {
490 throw new JMSException("The connection URI must be specified!");
491 }
492 return new URI(uri);
493 }
494 catch (URISyntaxException e) {
495 JMSException jmsEx = new JMSException("the URL string is badly formated:", e.getMessage());
496 jmsEx.setLinkedException(e);
497 throw jmsEx;
498
499 }
500 }
501
502 /***
503 * Called when a connection is closed so that we can shut down any embedded brokers cleanly
504 *
505 * @param connection
506 */
507 synchronized void onConnectionClose(ActiveMQConnection connection) throws JMSException {
508 if (--connectionCount <= 0) {
509
510 stop();
511 }
512
513 }
514
515 synchronized void onConnectionCreate(ActiveMQConnection connection) {
516 ++connectionCount;
517 }
518
519 }