001    /**
002     *
003     * Copyright 2004 Protique Ltd
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     *
017     **/
018    
019    package org.activemq.broker.impl;
020    import java.io.IOException;
021    import java.util.ArrayList;
022    import java.util.HashSet;
023    import java.util.Iterator;
024    import java.util.Set;
025    
026    import javax.jms.ExceptionListener;
027    import javax.jms.JMSException;
028    import javax.security.auth.Subject;
029    import javax.transaction.xa.XAException;
030    
031    import org.activemq.broker.BrokerAdmin;
032    import org.activemq.broker.BrokerClient;
033    import org.activemq.broker.BrokerConnector;
034    import org.activemq.io.util.SpooledBoundedActiveMQMessageQueue;
035    import org.activemq.message.ActiveMQMessage;
036    import org.activemq.message.ActiveMQXid;
037    import org.activemq.message.BrokerAdminCommand;
038    import org.activemq.message.BrokerInfo;
039    import org.activemq.message.CapacityInfo;
040    import org.activemq.message.CleanupConnectionInfo;
041    import org.activemq.message.ConnectionInfo;
042    import org.activemq.message.ConsumerInfo;
043    import org.activemq.message.DurableUnsubscribe;
044    import org.activemq.message.IntResponseReceipt;
045    import org.activemq.message.KeepAlive;
046    import org.activemq.message.MessageAck;
047    import org.activemq.message.Packet;
048    import org.activemq.message.PacketListener;
049    import org.activemq.message.ProducerInfo;
050    import org.activemq.message.Receipt;
051    import org.activemq.message.ResponseReceipt;
052    import org.activemq.message.SessionInfo;
053    import org.activemq.message.TransactionInfo;
054    import org.activemq.message.XATransactionInfo;
055    import org.activemq.transport.NetworkChannel;
056    import org.activemq.transport.NetworkConnector;
057    import org.activemq.transport.TransportChannel;
058    import org.activemq.util.IdGenerator;
059    import org.apache.commons.logging.Log;
060    import org.apache.commons.logging.LogFactory;
061    
062    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
063    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
064    import EDU.oswego.cs.dl.util.concurrent.ThreadedExecutor;
065    
066    /**
067     * A Broker client side proxy representing a JMS Connnection
068     * 
069     * @version $Revision: 1.1.1.1 $
070     */
071    public class BrokerClientImpl implements BrokerClient, ExceptionListener, PacketListener {
072        
073        private static final Log log = LogFactory.getLog(BrokerClientImpl.class);
074        private static final Log commandLog = LogFactory.getLog("org.activemq.broker.CommandTrace");
075        
076        private BrokerConnector brokerConnector;
077        private TransportChannel channel;
078        private ConnectionInfo connectionInfo;
079        private IdGenerator packetIdGenerator;
080        private SynchronizedBoolean closed;
081        private Set activeConsumers;
082        private CopyOnWriteArrayList consumers;
083        private CopyOnWriteArrayList producers;
084        private CopyOnWriteArrayList transactions;
085        private CopyOnWriteArrayList sessions;
086        private SynchronizedBoolean started;
087        private boolean brokerConnection;
088        private boolean clusteredConnection;
089        private String remoteBrokerName;
090        private int capacity = 100;
091        private SpooledBoundedActiveMQMessageQueue spoolQueue;
092        private boolean cleanedUp;
093        private boolean registered;
094        private ArrayList dispatchQueue = new ArrayList();
095        private Subject subject;
096        private boolean remoteNetworkConnector;
097    
098        /**
099         * Default Constructor of BrokerClientImpl
100         */
101        public BrokerClientImpl() {
102            this.packetIdGenerator = new IdGenerator();
103            this.closed = new SynchronizedBoolean(false);
104            this.started = new SynchronizedBoolean(false);
105            this.activeConsumers = new HashSet();
106            this.consumers = new CopyOnWriteArrayList();
107            this.producers = new CopyOnWriteArrayList();
108            this.transactions = new CopyOnWriteArrayList();
109            this.sessions = new CopyOnWriteArrayList();
110        }
111    
112        /**
113         * Initialize the BrokerClient
114         * 
115         * @param brokerConnector
116         * @param channel
117         */
118        public void initialize(BrokerConnector brokerConnector, TransportChannel channel) {
119            this.brokerConnector = brokerConnector;
120            this.channel = channel;
121            this.channel.setPacketListener(this);
122            this.channel.setExceptionListener(this);
123            log.trace("brokerConnectorConnector client initialized");
124        }
125    
126        /**
127         * @return the BrokerConnector this client is associated with
128         */
129        public BrokerConnector getBrokerConnector() {
130            return this.brokerConnector;
131        }
132    
133        /**
134         * @return the connection information for this client
135         */
136        public ConnectionInfo getConnectionInfo() {
137            return connectionInfo;
138        }
139    
140        /**
141         * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
142         */
143        public void onException(JMSException jmsEx) {
144            log.info("Client disconnected: " + this);
145            log.debug("Disconnect cuase: ", jmsEx);
146            close();
147        }
148    
149        /**
150         * @return pretty print for this brokerConnector-client
151         */
152        public String toString() {
153            String str = "brokerConnector-client:(" + hashCode() + ") ";
154            str += connectionInfo == null ? "" : connectionInfo.getClientId();
155            str += ": " + channel;
156            return str;
157        }
158    
159        /**
160         * Dispatch an ActiveMQMessage to the end client
161         * 
162         * @param message
163         */
164        public void dispatch(ActiveMQMessage message) {
165            if (!isSlowConsumer()) {
166                dispatchToClient(message);
167            }
168            else {
169                if (spoolQueue == null) {
170                    log.warn("Connection: " + connectionInfo.getClientId() + " is a slow consumer");
171                    String spoolName = brokerConnector.getBrokerInfo().getBrokerName() + "_" + connectionInfo.getClientId();
172                    try {
173                        spoolQueue = new SpooledBoundedActiveMQMessageQueue(brokerConnector.getBrokerContainer().getBroker()
174                                .getTempDir(), spoolName);
175                        final SpooledBoundedActiveMQMessageQueue bpq = spoolQueue;
176                        ThreadedExecutor exec = new ThreadedExecutor();
177                        exec.execute(new Runnable() {
178                            public void run() {
179                                while (!closed.get()) {
180                                    try {
181                                        Packet packet = bpq.dequeue();
182                                        if (packet != null) {
183                                            dispatchToClient(packet);
184                                        }
185                                    }
186                                    catch (InterruptedException e) {
187                                        log.warn("async dispatch got an interupt", e);
188                                    }
189                                    catch (JMSException e) {
190                                        log.error("async dispatch got an problem", e);
191                                    }
192                                }
193                            }
194                        });
195                    }
196                    catch (IOException e) {
197                        log.error("Could not create SpooledBoundedQueue for this slow consumer", e);
198                        close();
199                    }
200                    catch (InterruptedException e) {
201                        log.error("Could not create SpooledBoundedQueue for this slow consumer", e);
202                        close();
203                    }
204                }
205                if (spoolQueue != null) {
206                    try {
207                        spoolQueue.enqueue(message);
208                    }
209                    catch (JMSException e) {
210                        log.error(
211                                "Could not enqueue message " + message + " to SpooledBoundedQueue for this slow consumer",
212                                e);
213                        close();
214                    }
215                }
216            }
217        }
218    
219        private void dispatchToClient(Packet message) {
220            if (started.get()) {
221                send(message);
222                
223            }
224            else {
225                boolean msgSent = false;
226                if (message.isJMSMessage()) {
227                    ActiveMQMessage jmsMsg = (ActiveMQMessage) message;
228                    if (jmsMsg.getJMSActiveMQDestination().isAdvisory()) {
229                        send(message);
230                        msgSent = true;
231                    }
232                }
233                if (!msgSent) {
234                    // If the connection is stopped.. we have to hold the message till it is started.
235                    synchronized (started) {
236                        dispatchQueue.add(message);
237                    }
238                }
239            }
240        }
241    
242        /**
243         * @return true if the peer for this Client is itself another Broker
244         */
245        public boolean isBrokerConnection() {
246            return brokerConnection;
247        }
248    
249        /**
250         * @return true id this client is part of a cluster
251         */
252        public boolean isClusteredConnection() {
253            return clusteredConnection;
254        }
255    
256        /**
257         * Get the Capacity for in-progress messages at the peer (probably a JMSConnection) Legimate values between 0-100. 0
258         * capacity representing that the peer cannot process any more messages at the current time
259         * 
260         * @return
261         */
262        public int getCapacity() {
263            return capacity;
264        }
265    
266        /**
267         * @return the client id of the remote connection
268         */
269        public String getClientID() {
270            if (connectionInfo != null) {
271                return connectionInfo.getClientId();
272            }
273            return null;
274        }
275    
276        /**
277         * @return the channel used
278         */
279        public TransportChannel getChannel() {
280            return channel;
281        }
282    
283        /**
284         * Get an indication if the peer should be considered as a slow consumer
285         * 
286         * @return true id the peer should be considered as a slow consumer
287         */
288        public boolean isSlowConsumer() {
289            return capacity <= 20; //don't want to fill the peer completely - as this may effect it's processing!
290        }
291    
292        /**
293         * Consume a Packet from the underlying TransportChannel for processing
294         * 
295         * @param packet
296         */
297        public void consume(Packet packet) {
298            if (packet != null) {
299                
300                if( commandLog.isDebugEnabled() )
301                    commandLog.debug("broker for "+getClientID()+" received: "+packet);
302    
303                Throwable requestEx = null;
304                boolean failed = false;
305                boolean receiptRequired = packet.isReceiptRequired();
306                short correlationId = packet.getId();
307                String brokerName = brokerConnector.getBrokerInfo().getBrokerName();
308                String clusterName = brokerConnector.getBrokerInfo().getClusterName();
309                try {
310                    if (brokerConnection) {
311                        if (remoteBrokerName != null && remoteBrokerName.length() > 0) {
312                            packet.addBrokerVisited(remoteBrokerName); //got from the remote broker
313                        }
314                        packet.addBrokerVisited(brokerName);
315                    }
316                    if (packet.isJMSMessage()) {
317                        ActiveMQMessage message = (ActiveMQMessage) packet;
318                        
319                        if (!brokerConnection) {
320                            message.setEntryBrokerName(brokerName);
321                            message.setEntryClusterName(clusterName);
322                        }
323                        consumeActiveMQMessage(message);
324                    }
325                    else {
326                        switch (packet.getPacketType()) {
327                            case Packet.ACTIVEMQ_MSG_ACK : {
328                                MessageAck ack = (MessageAck) packet;
329                                consumeMessageAck(ack);
330                                break;
331                            }
332                            case Packet.XA_TRANSACTION_INFO : {
333                                XATransactionInfo info = (XATransactionInfo) packet;
334                                consumeXATransactionInfo(info);
335                                receiptRequired=info.isReceiptRequired();
336                                break;
337                            }
338                            case Packet.TRANSACTION_INFO : {
339                                TransactionInfo info = (TransactionInfo) packet;
340                                consumeTransactionInfo(info);
341                                break;
342                            }
343                            case Packet.CONSUMER_INFO : {
344                                ConsumerInfo info = (ConsumerInfo) packet;
345                                consumeConsumerInfo(info);
346                                break;
347                            }
348                            case Packet.PRODUCER_INFO : {
349                                ProducerInfo info = (ProducerInfo) packet;
350                                consumeProducerInfo(info);
351                                break;
352                            }
353                            case Packet.SESSION_INFO : {
354                                SessionInfo info = (SessionInfo) packet;
355                                consumeSessionInfo(info);
356                                break;
357                            }
358                            case Packet.ACTIVEMQ_CONNECTION_INFO : {
359                                ConnectionInfo info = (ConnectionInfo) packet;
360                                consumeConnectionInfo(info);
361                                break;
362                            }
363                            case Packet.DURABLE_UNSUBSCRIBE : {
364                                DurableUnsubscribe ds = (DurableUnsubscribe) packet;
365                                brokerConnector.durableUnsubscribe(this, ds);
366                                break;
367                            }
368                            case Packet.CAPACITY_INFO : {
369                                CapacityInfo info = (CapacityInfo) packet;
370                                consumeCapacityInfo(info);
371                                break;
372                            }
373                            case Packet.CAPACITY_INFO_REQUEST : {
374                                updateCapacityInfo(packet.getId());
375                                break;
376                            }
377                            case Packet.ACTIVEMQ_BROKER_INFO : {
378                                consumeBrokerInfo((BrokerInfo) packet);
379                                break;
380                            }
381                            case Packet.KEEP_ALIVE : {
382                                // Ignore as the packet contains no additional information to consume
383                                break;
384                            }
385                            case Packet.BROKER_ADMIN_COMMAND : {
386                                consumeBrokerAdminCommand((BrokerAdminCommand) packet);
387                                break;
388                            }
389                            case Packet.CLEANUP_CONNECTION_INFO : {
390                                consumeCleanupConnectionInfo((CleanupConnectionInfo) packet);
391                                break;
392                            }
393                            default : {
394                                log.warn("Unknown Packet received: " + packet);
395                                break;
396                            }
397                        }
398                    }
399                }
400                catch (Throwable e) {
401                    requestEx = e;
402                    log.warn("caught exception consuming packet: " + packet, e);
403                    failed = true;
404                }
405                if (receiptRequired){
406                    sendReceipt(correlationId, requestEx, failed);
407                }
408            }
409        }
410    
411        /**
412         * @param cleanupInfo
413         * @throws JMSException 
414         */
415        private void consumeCleanupConnectionInfo(CleanupConnectionInfo cleanupInfo) throws JMSException {
416            try {
417                
418                for (Iterator i = consumers.iterator(); i.hasNext();) {
419                    ConsumerInfo info = (ConsumerInfo) i.next();
420                    info.setStarted(false);
421                    this.brokerConnector.deregisterMessageConsumer(this, info);
422                }
423                for (Iterator i = producers.iterator(); i.hasNext();) {
424                    ProducerInfo info = (ProducerInfo) i.next();
425                    info.setStarted(false);
426                    this.brokerConnector.deregisterMessageProducer(this, info);
427                }
428                for (Iterator i = sessions.iterator(); i.hasNext();) {
429                    SessionInfo info = (SessionInfo) i.next();
430                    info.setStarted(false);
431                    this.brokerConnector.deregisterSession(this, info);
432                }
433                for (Iterator i = transactions.iterator(); i.hasNext();) {
434                    this.brokerConnector.rollbackTransaction(this, i.next().toString());
435                }
436                this.brokerConnector.deregisterClient(this, connectionInfo);
437                registered = false;
438                
439            } finally {
440                // whatever happens, lets make sure we unregister & clean things
441                // down
442                if (log.isDebugEnabled()) {
443                    log.debug(this + " has stopped");
444                }
445                this.consumers.clear();
446                this.producers.clear();
447                this.transactions.clear();
448                this.sessions.clear();
449            }
450    
451        }
452    
453        /**
454         * @param command
455         * @throws JMSException
456         */
457        private void consumeBrokerAdminCommand(BrokerAdminCommand command) throws JMSException {
458            BrokerAdmin brokerAdmin = brokerConnector.getBrokerContainer().getBroker().getBrokerAdmin();
459            if (BrokerAdminCommand.CREATE_DESTINATION.equals(command.getCommand())) {
460                brokerAdmin.createMessageContainer(command.getDestination());
461            }
462            else if (BrokerAdminCommand.DESTROY_DESTINATION.equals(command.getCommand())) {
463                brokerAdmin.destoryMessageContainer(command.getDestination());
464            }
465            else if (BrokerAdminCommand.EMPTY_DESTINATION.equals(command.getCommand())) {
466                brokerAdmin.getMessageContainerAdmin(command.getDestination()).empty();
467            }
468            else if (BrokerAdminCommand.SHUTDOWN_SERVER_VM.equals(command.getCommand())) {
469                if (Boolean.getBoolean("enable.vm.shutdown")) {
470                    log.info("processing command=" + BrokerAdminCommand.SHUTDOWN_SERVER_VM);
471                    System.exit(1);
472                } else
473                {
474                    log.warn("ignoring command=" + BrokerAdminCommand.SHUTDOWN_SERVER_VM + ", enable.vm.shutdown=false");
475                }
476            }
477            else {
478                throw new JMSException("Broker Admin Command type: " + command.getCommand() + " not recognized.");
479            }
480        }
481    
482        /**
483         * Register/deregister MessageConsumer with the Broker
484         * 
485         * @param info
486         * @throws JMSException
487         */
488        public void consumeConsumerInfo(ConsumerInfo info) throws JMSException {
489            String localBrokerName = brokerConnector.getBrokerInfo().getBrokerName();
490            if (info.isStarted()) {
491                consumers.add(info);
492                if (this.activeConsumers.add(info)) {
493                    this.brokerConnector.registerMessageConsumer(this, info);
494                }
495            }
496            else {
497                consumers.remove(info);
498                if (activeConsumers.remove(info)) {
499                    this.brokerConnector.deregisterMessageConsumer(this, info);
500                }
501            }
502        }
503    
504        /**
505         * Update the peer Connection about the Broker's capacity for messages
506         * 
507         * @param capacity
508         */
509        public void updateBrokerCapacity(int capacity) {
510            CapacityInfo info = new CapacityInfo();
511            info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName());
512            info.setCapacity(capacity);
513            info.setFlowControlTimeout(getFlowControlTimeout(capacity));
514            send(info);
515        }
516    
517        /**
518         * register with the Broker
519         * 
520         * @param info
521         * @throws JMSException
522         */
523        public void consumeConnectionInfo(ConnectionInfo info) throws JMSException {
524            this.connectionInfo = info;
525            if (info.isClosed()) {
526                try {
527                    cleanUp();
528                    if (info.isReceiptRequired()){
529                        sendReceipt(info.getId(), null, false);
530                    }
531                    info.setReceiptRequired(false);
532                    try {
533                        Thread.sleep(500);
534                    }
535                    catch (Throwable e) {
536                    }
537                }
538                finally {
539                    close();
540                }
541            }
542            else {
543                if (!registered) {
544                    this.brokerConnector.registerClient(this, info);
545                    registered = true;
546                }
547                synchronized (started) {
548                    //set transport hint
549                    if (info.getProperties() != null && info.getProperties().getProperty(ConnectionInfo.NO_DELAY_PROPERTY) != null){
550                       boolean noDelay = new Boolean(info.getProperties().getProperty(ConnectionInfo.NO_DELAY_PROPERTY)).booleanValue();
551                       channel.setNoDelay(noDelay);
552                        
553                    }
554                    if (!started.get() && info.isStarted()) {
555                        started.set(true);
556                        // Dispatch any queued
557                        log.debug(this + " has started running client version " + info.getClientVersion()
558                                + " , wire format = " + info.getWireFormatVersion());
559                        //go through consumers, producers, and sessions - setting their clientId (which might not have been set)
560                        for (Iterator i = consumers.iterator();i.hasNext();) {
561                            ConsumerInfo ci = (ConsumerInfo) i.next();
562                            ci.setClientId(info.getClientId());
563                        }
564                        for (Iterator i = producers.iterator();i.hasNext();) {
565                            ProducerInfo pi = (ProducerInfo) i.next();
566                            pi.setClientId(info.getClientId());
567                        }
568                        for (Iterator i = sessions.iterator();i.hasNext();) {
569                            SessionInfo si = (SessionInfo) i.next();
570                            si.setClientId(info.getClientId());
571                        }
572                        for (int i = 0;i < dispatchQueue.size();i++) {
573                            ActiveMQMessage msg = (ActiveMQMessage) dispatchQueue.get(i);
574                            dispatch(msg);
575                        }
576                        dispatchQueue.clear();
577                    }
578                    if (started.get() && !info.isStarted()) {
579                        started.set(false);
580                        log.debug(this + " has stopped");
581                    }
582                }
583            }
584        }
585    
586        /**
587         * start consuming messages
588         * 
589         * @throws JMSException
590         */
591        public void start() throws JMSException {
592            channel.start();
593        }
594    
595        /**
596         * stop consuming messages
597         * 
598         * @throws JMSException
599         */
600        public void stop() throws JMSException {
601            log.trace("Stopping channel: " + channel);
602            channel.stop();
603        }
604    
605        /**
606         * cleanup
607         */
608        public synchronized void cleanUp() {
609            // we could be called here from 2 different code paths
610            // based on if we get a transport failure or we do a clean shutdown
611            // so lets only run this stuff once
612            if (!cleanedUp) {
613                cleanedUp = true;
614                try {
615                    try {
616                        for (Iterator i = consumers.iterator();i.hasNext();) {
617                            ConsumerInfo info = (ConsumerInfo) i.next();
618                            info.setStarted(false);
619                            this.brokerConnector.deregisterMessageConsumer(this, info);
620                        }
621                        for (Iterator i = producers.iterator();i.hasNext();) {
622                            ProducerInfo info = (ProducerInfo) i.next();
623                            info.setStarted(false);
624                            this.brokerConnector.deregisterMessageProducer(this, info);
625                        }
626                        for (Iterator i = sessions.iterator();i.hasNext();) {
627                            SessionInfo info = (SessionInfo) i.next();
628                            info.setStarted(false);
629                            this.brokerConnector.deregisterSession(this, info);
630                        }
631                        for (Iterator i = transactions.iterator();i.hasNext();) {
632                            this.brokerConnector.rollbackTransaction(this, i.next().toString());
633                        }
634                    }
635                    finally {
636                        // whatever happens, lets make sure we unregister & clean things down
637                        if (log.isDebugEnabled()) {
638                            log.debug(this + " has stopped");
639                        }
640                        this.consumers.clear();
641                        this.producers.clear();
642                        this.transactions.clear();
643                        this.sessions.clear();
644                        this.brokerConnector.deregisterClient(this, connectionInfo);
645                        registered = false;
646                    }
647                }
648                catch (JMSException e) {
649                    log.warn("failed to de-register Broker client: " + e, e);
650                }
651            }
652            else {
653                log.debug("We are ignoring a duplicate cleanup() method called for: " + this);
654            }
655        }
656    
657        // Implementation methods
658        //-------------------------------------------------------------------------
659        protected void send(Packet packet) {
660            if (!closed.get()) {
661                try {
662                    if (brokerConnection) {
663                        String brokerName = brokerConnector.getBrokerContainer().getBroker().getBrokerName();
664                        packet.addBrokerVisited(brokerName);
665                        if (packet.hasVisited(remoteBrokerName)) {
666                            if (log.isDebugEnabled()) {
667                                log.debug("Not Forwarding: " + remoteBrokerName + " has already been visited by packet: "
668                                        + packet);
669                            }
670                            return;
671                        }
672                    }
673                    packet.setId(this.packetIdGenerator.getNextShortSequence());
674                    if( commandLog.isDebugEnabled() )
675                        commandLog.debug("broker for "+getClientID()+" sending: "+packet);
676                    this.channel.asyncSend(packet);
677                }
678                catch (JMSException e) {
679                    log.warn(this + " caught exception ", e);
680                    close();
681                }
682            }
683        }
684        
685        /**
686         * validate the connection
687         * @param timeout
688         * @throws JMSException
689         */
690            public void validateConnection(int timeout) throws JMSException {
691                    KeepAlive packet = new KeepAlive();
692                    packet.setReceiptRequired(true);
693                    packet.setId(this.packetIdGenerator.getNextShortSequence());
694                    // In most cases, if the transport is dead due to network errors
695                    // the network error will be recognised immediately and an exception 
696                    // thrown. If the duplicate client ids are due to misconfiguration, 
697                    // we make sure that we do not terminate the "right" connection 
698                    // prematurely by using a long timeout here. If the existing client
699                    // is working heavily and/or over a slow link, it might take some time
700                    // for it to respond. In such a case, the new client is misconfigured
701                    // and can wait for a while before being kicked out.
702    
703                    Receipt r = getChannel().send(packet, timeout);
704                    if (r == null) throw new JMSException("Client did not respond in time");
705    
706            }
707        
708        protected void close() {
709            if (closed.commit(false, true)) {
710                this.channel.stop();
711                log.debug(this + " has closed");
712            }
713        }
714    
715        /**
716         * Send message to Broker
717         * 
718         * @param message
719         * @throws JMSException
720         */
721        private void consumeActiveMQMessage(ActiveMQMessage message) throws JMSException {
722            this.brokerConnector.sendMessage(this, message);
723        }
724    
725        /**
726         * Send Message acknowledge to the Broker
727         * 
728         * @param ack
729         * @throws JMSException
730         */
731        private void consumeMessageAck(MessageAck ack) throws JMSException {
732            this.brokerConnector.acknowledgeMessage(this, ack);
733        }
734    
735        /**
736         * Handle transaction start/commit/rollback
737         * 
738         * @param info
739         * @throws JMSException
740         */
741        private void consumeTransactionInfo(TransactionInfo info) throws JMSException {
742            if (info.getType() == TransactionInfo.START) {
743                transactions.add(info.getTransactionId());
744                this.brokerConnector.startTransaction(this, info.getTransactionId());
745            }
746            else {
747                if (info.getType() == TransactionInfo.ROLLBACK) {
748                    this.brokerConnector.rollbackTransaction(this, info.getTransactionId());
749                }
750                else if (info.getType() == TransactionInfo.COMMIT) {
751                    this.brokerConnector.commitTransaction(this, info.getTransactionId());
752                }
753                transactions.remove(info.getTransactionId());
754            }
755        }
756    
757        /**
758         * Handle XA transaction start/prepare/commit/rollback
759         * 
760         * @param info
761         * @throws JMSException
762         * @throws XAException
763         */
764        private void consumeXATransactionInfo(XATransactionInfo info) throws JMSException, XAException {
765            if (info.getType() == XATransactionInfo.START) {
766                this.brokerConnector.startTransaction(this, info.getXid());
767            }
768            else if (info.getType() == XATransactionInfo.XA_RECOVER) {
769                ActiveMQXid rc[] = this.brokerConnector.getPreparedTransactions(this);
770                if( info.isReceiptRequired()) {
771                    // We will be sending our own receipt..
772                    info.setReceiptRequired(false);
773                    // Send the receipt..
774                    ResponseReceipt receipt = new ResponseReceipt();
775                    receipt.setCorrelationId(info.getId());
776                    receipt.setResult(rc);
777                    send(receipt);
778                }
779            }
780            else if (info.getType() == XATransactionInfo.GET_RM_ID) {
781                String rc = this.brokerConnector.getResourceManagerId(this);
782                if( info.isReceiptRequired()) {
783                    // We will be sending our own receipt..
784                    info.setReceiptRequired(false);
785                    // Send the receipt..
786                    ResponseReceipt receipt = new ResponseReceipt();
787                    receipt.setId(this.packetIdGenerator.getNextShortSequence());
788                    receipt.setCorrelationId(info.getId());
789                    receipt.setResult(rc);
790                    send(receipt);
791                }
792            }
793            else if (info.getType() == XATransactionInfo.END) {
794                // we don't do anything..
795            }
796            else {
797                if (info.getType() == XATransactionInfo.PRE_COMMIT) {
798                    int rc = this.brokerConnector.prepareTransaction(this, info.getXid());
799                    // We will be sending our own receipt..
800                    if( info.isReceiptRequired()) {
801                        info.setReceiptRequired(false);
802                        // Send the receipt..
803                        IntResponseReceipt receipt = new IntResponseReceipt();
804                        receipt.setId(this.packetIdGenerator.getNextShortSequence());
805                        receipt.setCorrelationId(info.getId());
806                        receipt.setResult(rc);
807                        send(receipt);
808                    }
809                }
810                else if (info.getType() == XATransactionInfo.ROLLBACK) {
811                    this.brokerConnector.rollbackTransaction(this, info.getXid());
812                }
813                else if (info.getType() == XATransactionInfo.COMMIT_ONE_PHASE) {
814                    this.brokerConnector.commitTransaction(this, info.getXid(), true);
815                }
816                else if (info.getType() == XATransactionInfo.COMMIT) {
817                    this.brokerConnector.commitTransaction(this, info.getXid(), false);
818                }
819                else {
820                    throw new JMSException("Packet type: " + info.getType() + " not recognized.");
821                }
822            }
823        }
824    
825        /**
826         * register/deregister MessageProducer in the Broker
827         * 
828         * @param info
829         * @throws JMSException
830         */
831        private void consumeProducerInfo(ProducerInfo info) throws JMSException {
832            if (info.isStarted()) {
833                producers.add(info);
834                this.brokerConnector.registerMessageProducer(this, info);
835            }
836            else {
837                producers.remove(info);
838                this.brokerConnector.deregisterMessageProducer(this, info);
839            }
840        }
841    
842        /**
843         * register/deregister Session in a Broker
844         * 
845         * @param info
846         * @throws JMSException
847         */
848        private void consumeSessionInfo(SessionInfo info) throws JMSException {
849            if (info.isStarted()) {
850                sessions.add(info);
851                this.brokerConnector.registerSession(this, info);
852            }
853            else {
854                sessions.remove(info);
855                this.brokerConnector.deregisterSession(this, info);
856            }
857        }
858    
859        /**
860         * Update capacity for the peer
861         * 
862         * @param info
863         */
864        private void consumeCapacityInfo(CapacityInfo info) {
865            this.capacity = info.getCapacity();
866        }
867    
868        private void updateCapacityInfo(short correlationId) {
869            CapacityInfo info = new CapacityInfo();
870            info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName());
871            info.setCorrelationId(correlationId);
872            info.setCapacity(this.brokerConnector.getBrokerCapacity());
873            info.setFlowControlTimeout(getFlowControlTimeout(info.getCapacity()));
874            send(info);
875        }
876    
877        private long getFlowControlTimeout(int capacity) {
878            long result = -1;
879            if (capacity <= 0) {
880                result = 10000;
881            }
882            else if (capacity <= 10) {
883                result = 1000;
884            }
885            else if (capacity <= 20) {
886                result = 10;
887            }
888            return result;
889        }
890    
891        private void consumeBrokerInfo(final BrokerInfo info) {
892            brokerConnection = true;
893            started.set(true);
894            remoteBrokerName = info.getBrokerName();
895            if (remoteBrokerName == null || remoteBrokerName.length() == 0) {
896                log.warn("No remote broker name available!");
897            }
898            else {
899                if (log.isDebugEnabled()) {
900                   log.debug("Received broker info from: " + remoteBrokerName + " on client: " + channel);
901                }
902            }
903            String clusterName = getBrokerConnector().getBrokerContainer().getBroker().getBrokerClusterName();
904            if (clusterName.equals(info.getClusterName())) {
905                clusteredConnection = true;
906            }
907            if (!remoteNetworkConnector && info.isRemote()) {
908                try {
909                    final NetworkConnector networkConnector = new NetworkConnector(brokerConnector.getBrokerContainer());
910                    networkConnector.getThreadPool().execute(new Runnable() {
911                        public void run() {
912                            try {
913                                NetworkChannel networkChannel = new NetworkChannel(networkConnector, brokerConnector
914                                        .getBrokerContainer(), channel, info.getBrokerName(), info.getClusterName());
915                                networkConnector.addNetworkChannel(networkChannel);
916                                brokerConnector.getBrokerContainer().addNetworkConnector(networkConnector);
917                                networkConnector.start();
918                            }
919                            catch (JMSException e) {
920                                log.error("Failed to create reverse remote channel", e);
921                            }
922                        }
923                    });
924                    log.info("Started reverse remote channel to " + remoteBrokerName);
925                    remoteNetworkConnector = true;
926                }
927                catch (InterruptedException e) {
928                    log.error("Failed to create reverse remote channel", e);
929                }
930            }
931        }
932    
933    
934        private void sendReceipt(short correlationId, Throwable requestEx, boolean failed) {
935            Receipt receipt = new Receipt();
936            receipt.setCorrelationId(correlationId);
937            receipt.setBrokerName(brokerConnector.getBrokerInfo().getBrokerName());
938            receipt.setClusterName(brokerConnector.getBrokerInfo().getClusterName());
939            receipt.setException(requestEx);
940            receipt.setFailed(failed);
941            send(receipt);
942        }
943    
944        /**
945         * @param subject
946         */
947        public void setSubject(Subject subject) {
948            this.subject = subject;
949        }
950    
951        /**
952         * @return the subject
953         */
954        public Subject getSubject() {
955            return subject;
956        }
957    }