1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.broker.impl;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.broker.BrokerClient;
23 import org.codehaus.activemq.broker.BrokerConnector;
24 import org.codehaus.activemq.broker.BrokerContainer;
25 import org.codehaus.activemq.message.*;
26 import org.codehaus.activemq.transport.TransportChannel;
27 import org.codehaus.activemq.transport.TransportChannelListener;
28 import org.codehaus.activemq.transport.TransportServerChannel;
29 import org.codehaus.activemq.transport.TransportServerChannelProvider;
30
31 import javax.jms.JMSException;
32 import javax.jms.JMSSecurityException;
33 import javax.transaction.xa.XAException;
34 import java.net.URI;
35 import java.net.URISyntaxException;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.Map;
39
40 /***
41 * An implementation of the broker (the JMS server)
42 *
43 * @version $Revision: 1.8 $
44 */
45 public class BrokerConnectorImpl implements BrokerConnector, TransportChannelListener {
46 private BrokerInfo brokerInfo;
47
48 private TransportServerChannel serverChannel;
49 private Log log;
50 private BrokerContainer container;
51 private Map clients = Collections.synchronizedMap(new HashMap());
52
53 /***
54 * Helper constructor for TCP protocol with the given bind address
55 *
56 * @param container
57 * @param bindAddress
58 * @throws JMSException
59 */
60 public BrokerConnectorImpl(BrokerContainer container, String bindAddress, WireFormat wireFormat) throws JMSException {
61 this(container, createTransportServerChannel(wireFormat, bindAddress));
62 }
63
64 /***
65 * @param container
66 * @param serverChannel
67 */
68 public BrokerConnectorImpl(BrokerContainer container, TransportServerChannel serverChannel) {
69 assert container != null;
70 this.brokerInfo = new BrokerInfo();
71 this.brokerInfo.setBrokerName(container.getBroker().getBrokerName());
72 this.log = LogFactory.getLog(getClass().getName());
73 this.serverChannel = serverChannel;
74 this.container = container;
75 this.container.addConnector(this);
76 serverChannel.setTransportChannelListener(this);
77 }
78
79 /***
80 * @return infomation about the Broker
81 */
82 public BrokerInfo getBrokerInfo() {
83 return brokerInfo;
84 }
85
86 /***
87 * Get a hint about the broker capacity for more messages
88 *
89 * @return percentage value (0-100) about how much capacity the
90 * broker has
91 */
92 public int getBrokerCapacity() {
93 return container.getBroker().getRoundedCapacity();
94 }
95
96 /***
97 * @return Get the server channel
98 */
99 public TransportServerChannel getServerChannel() {
100 return serverChannel;
101 }
102
103 /***
104 * start the Broker
105 *
106 * @throws JMSException
107 */
108 public void start() throws JMSException {
109 this.serverChannel.start();
110 log.info("ActiveMQ connector started: " + this);
111 }
112
113 /***
114 * Stop the Broker
115 *
116 * @throws JMSException
117 */
118 public void stop() throws JMSException {
119 this.serverChannel.stop();
120 this.container.removeConnector(this);
121 log.info("ActiveMQ connector stopped: " + this);
122 }
123
124 /***
125 * Register a Broker Client
126 *
127 * @param client
128 * @param info contains infomation about the Connection this Client represents
129 * @throws JMSException
130 * @throws javax.jms.InvalidClientIDException
131 * if the JMS client specifies an invalid or duplicate client ID.
132 * @throws JMSSecurityException if client authentication fails due to an invalid user name or password.
133 */
134 public void registerClient(BrokerClient client, ConnectionInfo info) throws JMSException {
135 this.container.registerConnection(client, info);
136 }
137
138 /***
139 * Deregister a Broker Client
140 *
141 * @param client
142 * @param info
143 * @throws JMSException if some internal error occurs
144 */
145 public void deregisterClient(BrokerClient client, ConnectionInfo info) throws JMSException {
146 this.container.deregisterConnection(client, info);
147 }
148
149 /***
150 * Registers a MessageConsumer
151 *
152 * @param client
153 * @param info
154 * @throws JMSException
155 * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
156 */
157 public void registerMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
158 if (info.getDestination() == null) {
159 throw new JMSException("No Destination specified on consumerInfo for client: " + client + " info: " + info);
160 }
161 this.container.registerMessageConsumer(client, info);
162
163 }
164
165 /***
166 * De-register a MessageConsumer from the Broker
167 *
168 * @param client
169 * @param info
170 * @throws JMSException
171 */
172 public void deregisterMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
173 this.container.deregisterMessageConsumer(client, info);
174 }
175
176 /***
177 * Registers a MessageProducer
178 *
179 * @param client
180 * @param info
181 * @throws JMSException
182 * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
183 */
184 public void registerMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
185 this.container.registerMessageProducer(client, info);
186 }
187
188 /***
189 * De-register a MessageProducer from the Broker
190 *
191 * @param client
192 * @param info
193 * @throws JMSException
194 */
195 public void deregisterMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
196 this.container.deregisterMessageProducer(client, info);
197 }
198
199 /***
200 * Register a client-side Session (used for Monitoring)
201 *
202 * @param client
203 * @param info
204 * @throws JMSException
205 */
206 public void registerSession(BrokerClient client, SessionInfo info) throws JMSException {
207 this.container.registerSession(client, info);
208 }
209
210 /***
211 * De-register a client-side Session from the Broker (used for monitoring)
212 *
213 * @param client
214 * @param info
215 * @throws JMSException
216 */
217 public void deregisterSession(BrokerClient client, SessionInfo info) throws JMSException {
218 this.container.deregisterSession(client, info);
219 }
220
221 /***
222 * Start a transaction from the Client session
223 *
224 * @param client
225 * @param transactionId
226 * @throws JMSException
227 */
228 public void startTransaction(BrokerClient client, String transactionId) throws JMSException {
229 this.container.startTransaction(client, transactionId);
230 }
231
232 /***
233 * Rollback a transacton
234 *
235 * @param client
236 * @param transactionId
237 * @throws JMSException
238 */
239 public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException {
240 this.container.rollbackTransaction(client, transactionId);
241 }
242
243 /***
244 * Commit a transaction
245 *
246 * @param client
247 * @param transactionId
248 * @throws JMSException
249 */
250 public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
251 this.container.commitTransaction(client, transactionId);
252 }
253
254 /***
255 * send message with a transaction context
256 *
257 * @param client
258 * @param transactionId
259 * @param message
260 * @throws JMSException
261 */
262 public void sendTransactedMessage(BrokerClient client, String transactionId, ActiveMQMessage message)
263 throws JMSException {
264 this.container.sendTransactedMessage(client, transactionId, message);
265 }
266
267 /***
268 * Acknowledge receipt of a message within a transaction context
269 *
270 * @param client
271 * @param transactionId
272 * @param ack
273 * @throws JMSException
274 */
275 public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack)
276 throws JMSException {
277 this.container.acknowledgeTransactedMessage(client, transactionId, ack);
278 }
279
280 /***
281 * Send a non-transacted message to the Broker
282 *
283 * @param client
284 * @param message
285 * @throws JMSException
286 */
287 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
288 this.container.sendMessage(client, message);
289 }
290
291 /***
292 * Acknowledge reciept of a message
293 *
294 * @param client
295 * @param ack
296 * @throws JMSException
297 */
298 public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
299 this.container.acknowledgeMessage(client, ack);
300 }
301
302 /***
303 * Command to delete a durable topic subscription
304 *
305 * @param client
306 * @param ds
307 * @throws JMSException
308 */
309 public void durableUnsubscribe(BrokerClient client, DurableUnsubscribe ds) throws JMSException {
310 this.container.durableUnsubscribe(client, ds);
311 }
312
313
314 /***
315 * @param channel - client to add
316 */
317 public void addClient(TransportChannel channel) {
318 try {
319 BrokerClient client = new BrokerClientImpl();
320 client.initialize(this, channel);
321 if (log.isDebugEnabled()) {
322 log.debug("Starting new client: " + client);
323 }
324 channel.start();
325 clients.put(channel, client);
326 }
327 catch (JMSException e) {
328 log.error("Failed to add client due to: " + e, e);
329 }
330 }
331
332 /***
333 * @param channel - client to remove
334 */
335 public void removeClient(TransportChannel channel) {
336 BrokerClient client = (BrokerClient) clients.remove(channel);
337 if (client != null) {
338 if (log.isDebugEnabled()) {
339 log.debug("Client leaving client: " + client);
340 }
341
342
343 client.cleanUp();
344 }
345 else {
346
347 log.warn("No such client for channel: " + channel);
348 }
349 }
350
351 /***
352 * @return the BrokerContainer for this Connector
353 */
354 public BrokerContainer getBrokerContainer() {
355 return this.container;
356 }
357
358
359
360
361 /***
362 * Factory method ot create a transport channel
363 *
364 * @param bindAddress
365 * @return @throws JMSException
366 * @throws JMSException
367 */
368 protected static TransportServerChannel createTransportServerChannel(WireFormat wireFormat, String bindAddress) throws JMSException {
369 URI url;
370 try {
371 url = new URI(bindAddress);
372 }
373 catch (URISyntaxException e) {
374 JMSException jmsEx = new JMSException("Badly formated bindAddress: " + e.getMessage());
375 jmsEx.setLinkedException(e);
376 throw jmsEx;
377 }
378 return TransportServerChannelProvider.create(wireFormat, url);
379 }
380
381 /***
382 * Start an XA transaction.
383 *
384 * @see org.codehaus.activemq.broker.BrokerConnector#startTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
385 */
386 public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
387 this.container.startTransaction(client, xid);
388 }
389
390 /***
391 * Gets the prepared XA transactions.
392 *
393 * @see org.codehaus.activemq.broker.BrokerConnector#getPreparedTransactions(org.codehaus.activemq.broker.BrokerClient)
394 */
395 public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException {
396 return this.container.getPreparedTransactions(client);
397 }
398
399 /***
400 * Prepare an XA transaction.
401 *
402 * @see org.codehaus.activemq.broker.BrokerConnector#prepareTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
403 */
404 public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
405 return this.container.prepareTransaction(client, xid);
406 }
407
408 /***
409 * Rollback an XA transaction.
410 *
411 * @see org.codehaus.activemq.broker.BrokerConnector#rollbackTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
412 */
413 public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
414 this.container.rollbackTransaction(client, xid);
415 }
416
417 /***
418 * Commit an XA transaction.
419 *
420 * @see org.codehaus.activemq.broker.BrokerConnector#commitTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid, boolean)
421 */
422 public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException {
423 this.container.commitTransaction(client, xid, onePhase);
424 }
425
426 /***
427 * @see org.codehaus.activemq.broker.BrokerConnector#getResourceManagerId(org.codehaus.activemq.broker.BrokerClient)
428 */
429 public String getResourceManagerId(BrokerClient client) {
430
431 return getBrokerInfo().getBrokerName();
432 }
433
434 }