1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.broker.impl;
20
21 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
22 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
23 import EDU.oswego.cs.dl.util.concurrent.ThreadedExecutor;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.codehaus.activemq.broker.BrokerClient;
27 import org.codehaus.activemq.broker.BrokerConnector;
28 import org.codehaus.activemq.message.*;
29 import org.codehaus.activemq.message.util.BoundedPacketQueue;
30 import org.codehaus.activemq.message.util.SpooledBoundedPacketQueue;
31 import org.codehaus.activemq.transport.TransportChannel;
32 import org.codehaus.activemq.util.IdGenerator;
33
34 import javax.jms.ExceptionListener;
35 import javax.jms.JMSException;
36 import javax.transaction.xa.XAException;
37 import java.io.IOException;
38 import java.util.HashSet;
39 import java.util.Iterator;
40 import java.util.Set;
41
42 /***
43 * A Broker client side proxy representing a JMS Connnection
44 *
45 * @version $Revision: 1.16 $
46 */
47 public class BrokerClientImpl implements BrokerClient, ExceptionListener, PacketListener {
48 private static final Log log = LogFactory.getLog(BrokerClientImpl.class);
49 private BrokerConnector brokerConnector;
50 private TransportChannel channel;
51 private ConnectionInfo connectionInfo;
52 private IdGenerator packetIdGenerator;
53 private SynchronizedBoolean closed;
54 private Set activeConsumers;
55 private CopyOnWriteArrayList consumers;
56 private CopyOnWriteArrayList producers;
57 private CopyOnWriteArrayList transactions;
58 private CopyOnWriteArrayList xatransactions;
59 private CopyOnWriteArrayList sessions;
60 private boolean started;
61 private boolean brokerConnection;
62 private int capacity = 100;
63 private SpooledBoundedPacketQueue spoolQueue;
64 private boolean cleanedUp;
65
66 /***
67 * Default Constructor of BrokerClientImpl
68 */
69 public BrokerClientImpl() {
70 this.packetIdGenerator = new IdGenerator();
71 this.closed = new SynchronizedBoolean(false);
72 this.activeConsumers = new HashSet();
73 this.consumers = new CopyOnWriteArrayList();
74 this.producers = new CopyOnWriteArrayList();
75 this.transactions = new CopyOnWriteArrayList();
76 this.xatransactions = new CopyOnWriteArrayList();
77 this.sessions = new CopyOnWriteArrayList();
78 }
79
80 /***
81 * Initialize the BrokerClient
82 *
83 * @param brokerConnector
84 * @param channel
85 */
86 public void initialize(BrokerConnector brokerConnector, TransportChannel channel) {
87 this.brokerConnector = brokerConnector;
88 this.channel = channel;
89 this.channel.setPacketListener(this);
90 this.channel.setExceptionListener(this);
91 log.trace("brokerConnectorConnector client initialized");
92 }
93
94 /***
95 * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
96 */
97 public void onException(JMSException jmsEx) {
98 log.warn(this + " caught exception ", jmsEx);
99 close();
100 }
101
102 /***
103 * @return pretty print for this brokerConnector-client
104 */
105 public String toString() {
106 String str = "brokerConnector-client: ";
107 str += connectionInfo == null ? "" : connectionInfo.getClientId();
108 str += ": " + channel;
109 return str;
110 }
111
112 /***
113 * Dispatch an ActiveMQMessage to the end client
114 *
115 * @param message
116 */
117 public void dispatch(ActiveMQMessage message) {
118 if (isSlowConsumer()) {
119 if (spoolQueue == null) {
120 log.warn("Connection: " + connectionInfo.getClientId() + " is a slow consumer");
121 String spoolName = brokerConnector.getBrokerInfo().getBrokerName() + "_" + connectionInfo.getClientId();
122 try {
123 spoolQueue = new SpooledBoundedPacketQueue(brokerConnector.getBrokerContainer().getBroker()
124 .getTempDir(), spoolName);
125 final BoundedPacketQueue bpq = spoolQueue;
126 ThreadedExecutor exec = new ThreadedExecutor();
127 exec.execute(new Runnable() {
128 public void run() {
129 while (!closed.get()) {
130 try {
131 Packet packet = bpq.dequeue();
132 }
133 catch (InterruptedException e) {
134 log.warn("async dispatch got an interupt", e);
135 }
136 catch (JMSException e) {
137 log.error("async dispatch got an problem", e);
138 }
139 }
140 }
141 });
142 }
143 catch (IOException e) {
144 log.error("Could not create SpooledBoundedQueue for this slow consumer", e);
145 close();
146 }
147 catch (InterruptedException e) {
148 log.error("Could not create SpooledBoundedQueue for this slow consumer", e);
149 close();
150 }
151 }
152 if (spoolQueue != null) {
153 try {
154 spoolQueue.enqueue(message);
155 }
156 catch (JMSException e) {
157 log.error("Could not enqueue message " + message + " to SpooledBoundedQueue for this slow consumer",
158 e);
159 close();
160 }
161 }
162 }
163 else {
164 send(message);
165 }
166 }
167
168 /***
169 * @return true if the peer for this Client is itself another Broker
170 */
171 public boolean isBrokerConnection() {
172 return brokerConnection;
173 }
174
175 /***
176 * Get the Capacity for in-progress messages at the peer (probably a JMSConnection) Legimate values between 0-100. 0
177 * capacity representing that the peer cannot process any more messages at the current time
178 *
179 * @return
180 */
181 public int getCapacity() {
182 return capacity;
183 }
184
185 public String getClientID() {
186 if (connectionInfo != null) {
187 return connectionInfo.getClientId();
188 }
189 return null;
190 }
191
192 public TransportChannel getChannel() {
193 return channel;
194 }
195
196 /***
197 * Get an indication if the peer should be considered as a slow consumer
198 *
199 * @return true id the peer should be considered as a slow consumer
200 */
201 public boolean isSlowConsumer() {
202 return capacity <= 20;
203 }
204
205 /***
206 * Consume a Packet from the underlying TransportChannel for processing
207 *
208 * @param packet
209 */
210 public void consume(Packet packet) {
211 if (!closed.get()) {
212 Throwable requestEx = null;
213 boolean failed = false;
214 try {
215 if (packet.isJMSMessage()) {
216 ActiveMQMessage message = (ActiveMQMessage) packet;
217
218
219 if (connectionInfo != null) {
220 message.setProducerID(connectionInfo.getClientId());
221 }
222 else {
223 log.warn("No connection info available! Maybe the client forgot to start() the Connection?");
224 }
225 consumeActiveMQMessage(message);
226 }
227 else {
228 switch (packet.getPacketType()) {
229 case Packet.ACTIVEMQ_MSG_ACK:
230 {
231 MessageAck ack = (MessageAck) packet;
232 consumeMessageAck(ack);
233 break;
234 }
235 case Packet.XA_TRANSACTION_INFO:
236 {
237 XATransactionInfo info = (XATransactionInfo) packet;
238 consumeXATransactionInfo(info);
239 break;
240 }
241 case Packet.TRANSACTION_INFO:
242 {
243 TransactionInfo info = (TransactionInfo) packet;
244 consumeTransactionInfo(info);
245 break;
246 }
247 case Packet.CONSUMER_INFO:
248 {
249 ConsumerInfo info = (ConsumerInfo) packet;
250 consumeConsumerInfo(info);
251 break;
252 }
253 case Packet.PRODUCER_INFO:
254 {
255 ProducerInfo info = (ProducerInfo) packet;
256 consumeProducerInfo(info);
257 break;
258 }
259 case Packet.SESSION_INFO:
260 {
261 SessionInfo info = (SessionInfo) packet;
262 consumeSessionInfo(info);
263 break;
264 }
265 case Packet.ACTIVEMQ_CONNECTION_INFO:
266 {
267 ConnectionInfo info = (ConnectionInfo) packet;
268 consumeConnectionInfo(info);
269 break;
270 }
271 case Packet.DURABLE_UNSUBSCRIBE:
272 {
273 DurableUnsubscribe ds = (DurableUnsubscribe) packet;
274 brokerConnector.durableUnsubscribe(this, ds);
275 break;
276 }
277 case Packet.CAPACITY_INFO:
278 {
279 CapacityInfo info = (CapacityInfo) packet;
280 consumeCapacityInfo(info);
281 break;
282 }
283 case Packet.CAPACITY_INFO_REQUEST:
284 {
285 updateCapacityInfo(packet.getId());
286 break;
287 }
288 default :
289 {
290 log.warn("Unknown Packet received: " + packet);
291 break;
292 }
293 }
294 }
295 }
296 catch (Throwable e) {
297 requestEx = e;
298 e.printStackTrace();
299 log.info("caught exception consuming packet: " + packet, e);
300 failed = true;
301 }
302 sendReceipt(packet, requestEx, failed);
303 }
304 }
305
306
307 /***
308 * Register/deregister MessageConsumer with the Broker
309 *
310 * @param info
311 * @throws JMSException
312 */
313 public void consumeConsumerInfo(ConsumerInfo info) throws JMSException {
314 if (info.isStarted()) {
315 consumers.add(info);
316 if ((connectionInfo != null && connectionInfo.isStarted())) {
317 if (this.activeConsumers.add(info)) {
318 this.brokerConnector.registerMessageConsumer(this, info);
319 }
320 }
321 }
322 else {
323 consumers.remove(info);
324 if (activeConsumers.remove(info)) {
325 this.brokerConnector.deregisterMessageConsumer(this, info);
326 }
327 }
328 }
329
330 /***
331 * Update the peer Connection about the Broker's capacity for messages
332 *
333 * @param capacity
334 */
335 public void updateBrokerCapacity(int capacity) {
336 CapacityInfo info = new CapacityInfo();
337 info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName());
338 info.setCapacity(capacity);
339 info.setFlowControlTimeout(getFlowControlTimeout(capacity));
340 send(info);
341 }
342
343 /***
344 * register with the Broker
345 *
346 * @param info
347 * @throws JMSException
348 */
349 public void consumeConnectionInfo(ConnectionInfo info) throws JMSException {
350 this.connectionInfo = info;
351 if (info.isClosed()) {
352 cleanUp();
353 try {
354 sendReceipt(info);
355 info.setReceiptRequired(false);
356 try {
357 Thread.sleep(500);
358 }
359 catch (Throwable e) {
360 }
361 }
362 finally {
363 close();
364 }
365 }
366 else {
367 if (!started && info.isStarted()) {
368 started = true;
369 if (log.isDebugEnabled()) {
370 log.debug(this + " has started");
371 }
372 this.brokerConnector.registerClient(this, info);
373
374 for (Iterator i = consumers.iterator(); i.hasNext();) {
375 ConsumerInfo ci = (ConsumerInfo) i.next();
376 ci.setClientId(info.getClientId());
377 }
378 for (Iterator i = producers.iterator(); i.hasNext();) {
379 ProducerInfo pi = (ProducerInfo) i.next();
380 pi.setClientId(info.getClientId());
381 }
382 }
383 if (info.isStarted()) {
384
385 for (Iterator i = consumers.iterator(); i.hasNext();) {
386 ConsumerInfo ci = (ConsumerInfo) i.next();
387 if (activeConsumers.add(ci)) {
388 this.brokerConnector.registerMessageConsumer(this, ci);
389 }
390 }
391 }
392 else {
393 log.debug(this + " has stopped");
394
395 for (Iterator i = consumers.iterator(); i.hasNext();) {
396 ConsumerInfo ci = (ConsumerInfo) i.next();
397 if (activeConsumers.remove(ci)) {
398 this.brokerConnector.deregisterMessageConsumer(this, ci);
399 }
400 }
401 }
402 }
403 }
404
405 /***
406 * start consuming messages
407 *
408 * @throws JMSException
409 */
410 public void start() throws JMSException {
411 channel.start();
412 }
413
414 /***
415 * stop consuming messages
416 *
417 * @throws JMSException
418 */
419 public void stop() throws JMSException {
420 log.trace("Stopping channel: " + channel);
421 channel.stop();
422 }
423
424 public synchronized void cleanUp() {
425
426
427
428 if (!cleanedUp) {
429 cleanedUp = true;
430 try {
431 try {
432 for (Iterator i = consumers.iterator(); i.hasNext();) {
433 ConsumerInfo info = (ConsumerInfo) i.next();
434 info.setStarted(false);
435 this.brokerConnector.deregisterMessageConsumer(this, info);
436 }
437 for (Iterator i = producers.iterator(); i.hasNext();) {
438 ProducerInfo info = (ProducerInfo) i.next();
439 info.setStarted(false);
440 this.brokerConnector.deregisterMessageProducer(this, info);
441 }
442 for (Iterator i = sessions.iterator(); i.hasNext();) {
443 SessionInfo info = (SessionInfo) i.next();
444 info.setStarted(false);
445 this.brokerConnector.deregisterSession(this, info);
446 }
447 for (Iterator i = transactions.iterator(); i.hasNext();) {
448 this.brokerConnector.rollbackTransaction(this, i.next().toString());
449 }
450 for (Iterator i = xatransactions.iterator(); i.hasNext();) {
451 try {
452 this.brokerConnector.rollbackTransaction(this, (ActiveMQXid) i.next());
453 }
454 catch (XAException e) {
455 log.warn("Transaction rollback failed:", e);
456 }
457 }
458 }
459 finally {
460
461 if (log.isDebugEnabled()) {
462 log.info(this + " has stopped");
463 }
464 this.consumers.clear();
465 this.producers.clear();
466 this.transactions.clear();
467 this.xatransactions.clear();
468 this.sessions.clear();
469 this.brokerConnector.deregisterClient(this, connectionInfo);
470 }
471 }
472 catch (JMSException e) {
473 log.warn("failed to de-register Broker client: " + e, e);
474 }
475 }
476 else {
477 log.debug("We are ignoring a duplicate cleanup() method called for: " + this);
478 }
479 }
480
481
482
483
484
485 protected void send(Packet packet) {
486 if (!closed.get()) {
487 try {
488 this.channel.asyncSend(packet);
489 }
490 catch (JMSException e) {
491 log.warn(this + " caught exception ", e);
492 close();
493 }
494 }
495 }
496
497 protected void close() {
498 if (closed.commit(false, true)) {
499 this.channel.stop();
500 }
501 }
502
503 /***
504 * Send message to Broker
505 *
506 * @param message
507 * @throws JMSException
508 */
509 private void consumeActiveMQMessage(ActiveMQMessage message) throws JMSException {
510 if (message.isPartOfTransaction()) {
511 this.brokerConnector.sendTransactedMessage(this, message.getTransactionId(), message);
512 }
513 else {
514 this.brokerConnector.sendMessage(this, message);
515 }
516 }
517
518 /***
519 * Send Message acknowledge to the Broker
520 *
521 * @param ack
522 * @throws JMSException
523 */
524 private void consumeMessageAck(MessageAck ack) throws JMSException {
525 if (ack.isPartOfTransaction()) {
526 this.brokerConnector.acknowledgeTransactedMessage(this, ack.getTransactionId(), ack);
527 }
528 else {
529 this.brokerConnector.acknowledgeMessage(this, ack);
530 }
531 }
532
533 /***
534 * Handle transaction start/commit/rollback
535 *
536 * @param info
537 * @throws JMSException
538 */
539 private void consumeTransactionInfo(TransactionInfo info) throws JMSException {
540 if (info.getType() == TransactionInfo.START) {
541 transactions.add(info.getTransactionId());
542 this.brokerConnector.startTransaction(this, info.getTransactionId());
543 }
544 else {
545 if (info.getType() == TransactionInfo.ROLLBACK) {
546 this.brokerConnector.rollbackTransaction(this, info.getTransactionId());
547 }
548 else if (info.getType() == TransactionInfo.COMMIT) {
549 this.brokerConnector.commitTransaction(this, info.getTransactionId());
550 }
551 transactions.remove(info.getTransactionId());
552 }
553 }
554
555 /***
556 * Handle XA transaction start/prepare/commit/rollback
557 *
558 * @param info
559 * @throws JMSException
560 * @throws XAException
561 */
562 private void consumeXATransactionInfo(XATransactionInfo info) throws JMSException, XAException {
563 if (info.getType() == XATransactionInfo.START) {
564 transactions.add(info.getXid());
565 this.brokerConnector.startTransaction(this, info.getXid());
566 }
567 else if (info.getType() == XATransactionInfo.XA_RECOVER) {
568 ActiveMQXid rc[] = this.brokerConnector.getPreparedTransactions(this);
569
570 info.setReceiptRequired(false);
571
572 ResponseReceipt receipt = new ResponseReceipt();
573 receipt.setId(this.packetIdGenerator.generateId());
574 receipt.setCorrelationId(info.getId());
575 receipt.setResult(rc);
576 send(receipt);
577 }
578 else if (info.getType() == XATransactionInfo.GET_RM_ID) {
579 String rc = this.brokerConnector.getResourceManagerId(this);
580
581 info.setReceiptRequired(false);
582
583 ResponseReceipt receipt = new ResponseReceipt();
584 receipt.setId(this.packetIdGenerator.generateId());
585 receipt.setCorrelationId(info.getId());
586 receipt.setResult(rc);
587 send(receipt);
588 }
589 else if (info.getType() == XATransactionInfo.END) {
590
591 }
592 else {
593 if (info.getType() == XATransactionInfo.PRE_COMMIT) {
594 int rc = this.brokerConnector.prepareTransaction(this, info.getXid());
595
596 info.setReceiptRequired(false);
597
598 IntResponseReceipt receipt = new IntResponseReceipt();
599 receipt.setId(this.packetIdGenerator.generateId());
600 receipt.setCorrelationId(info.getId());
601 receipt.setResult(rc);
602 send(receipt);
603 }
604 else if (info.getType() == XATransactionInfo.ROLLBACK) {
605 this.brokerConnector.rollbackTransaction(this, info.getXid());
606 }
607 else if (info.getType() == XATransactionInfo.COMMIT_ONE_PHASE) {
608 this.brokerConnector.commitTransaction(this, info.getXid(), true);
609 }
610 else if (info.getType() == XATransactionInfo.COMMIT) {
611 this.brokerConnector.commitTransaction(this, info.getXid(), false);
612 }
613 else {
614 throw new JMSException("Packet type: " + info.getType() + " not recognized.");
615 }
616 transactions.remove(info.getXid());
617 }
618 }
619
620 /***
621 * register/deregister MessageProducer in the Broker
622 *
623 * @param info
624 * @throws JMSException
625 */
626 private void consumeProducerInfo(ProducerInfo info) throws JMSException {
627 if (info.isStarted()) {
628 producers.add(info);
629 this.brokerConnector.registerMessageProducer(this, info);
630 }
631 else {
632 producers.remove(info);
633 this.brokerConnector.deregisterMessageProducer(this, info);
634 }
635 }
636
637 /***
638 * register/deregister Session in a Broker
639 *
640 * @param info
641 * @throws JMSException
642 */
643 private void consumeSessionInfo(SessionInfo info) throws JMSException {
644 if (info.isStarted()) {
645 sessions.add(info);
646 this.brokerConnector.registerSession(this, info);
647 }
648 else {
649 sessions.remove(info);
650 this.brokerConnector.deregisterSession(this, info);
651 }
652 }
653
654
655 /***
656 * Update capacity for the peer
657 *
658 * @param info
659 */
660 private void consumeCapacityInfo(CapacityInfo info) {
661 this.capacity = info.getCapacity();
662 }
663
664 private void updateCapacityInfo(String correlationId) {
665 CapacityInfo info = new CapacityInfo();
666 info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName());
667 info.setCorrelationId(correlationId);
668 info.setCapacity(this.brokerConnector.getBrokerCapacity());
669 info.setFlowControlTimeout(getFlowControlTimeout(info.getCapacity()));
670 send(info);
671 }
672
673 private long getFlowControlTimeout(int capacity) {
674 long result = -1;
675 if (capacity <= 0) {
676 result = 10000;
677 }
678 else if (capacity <= 10) {
679 result = 1000;
680 }
681 else if (capacity <= 20) {
682 result = 10;
683 }
684 return result;
685 }
686
687 private void sendReceipt(Packet packet) {
688 sendReceipt(packet, null, false);
689 }
690
691 private void sendReceipt(Packet packet, Throwable requestEx, boolean failed) {
692 if (packet.isReceiptRequired()) {
693 Receipt receipt = new Receipt();
694 receipt.setId(this.packetIdGenerator.generateId());
695 receipt.setCorrelationId(packet.getId());
696 receipt.setException(requestEx);
697 receipt.setFailed(failed);
698 send(receipt);
699 }
700 }
701 }