001    /**
002     *
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.broker.impl;
019    
020    import org.activemq.broker.BrokerClient;
021    import org.activemq.broker.BrokerConnector;
022    import org.activemq.broker.BrokerContainer;
023    import org.activemq.io.WireFormat;
024    import org.activemq.message.ActiveMQMessage;
025    import org.activemq.message.ActiveMQXid;
026    import org.activemq.message.BrokerInfo;
027    import org.activemq.message.ConnectionInfo;
028    import org.activemq.message.ConsumerInfo;
029    import org.activemq.message.DurableUnsubscribe;
030    import org.activemq.message.MessageAck;
031    import org.activemq.message.ProducerInfo;
032    import org.activemq.message.SessionInfo;
033    import org.activemq.transport.TransportChannel;
034    import org.activemq.transport.TransportChannelListener;
035    import org.activemq.transport.TransportServerChannel;
036    import org.activemq.transport.TransportServerChannelProvider;
037    import org.apache.commons.logging.Log;
038    import org.apache.commons.logging.LogFactory;
039    
040    import javax.jms.JMSException;
041    import javax.jms.JMSSecurityException;
042    import javax.transaction.xa.XAException;
043    import java.net.URI;
044    import java.net.URISyntaxException;
045    import java.util.Collections;
046    import java.util.HashMap;
047    import java.util.Map;
048    
049    /**
050     * An implementation of the broker (the JMS server)
051     *
052     * @version $Revision: 1.1.1.1 $
053     */
054    public class BrokerConnectorImpl implements BrokerConnector, TransportChannelListener {
055        private BrokerInfo brokerInfo;
056    
057        private TransportServerChannel serverChannel;
058        private Log log;
059        private BrokerContainer container;
060        private Map clients = Collections.synchronizedMap(new HashMap());
061    
062        /**
063         * Helper constructor for TCP protocol with the given bind address
064         *
065         * @param container
066         * @param bindAddress
067         * @throws JMSException
068         */
069        public BrokerConnectorImpl(BrokerContainer container, String bindAddress, WireFormat wireFormat) throws JMSException {
070            this(container, createTransportServerChannel(wireFormat, bindAddress));
071        }
072    
073        /**
074         * @param container
075         * @param serverChannel
076         */
077        public BrokerConnectorImpl(BrokerContainer container, TransportServerChannel serverChannel) {
078            assert container != null;
079            this.brokerInfo = new BrokerInfo();
080            this.brokerInfo.setBrokerName(container.getBroker().getBrokerName());
081            this.brokerInfo.setClusterName(container.getBroker().getBrokerClusterName());
082            this.log = LogFactory.getLog(getClass().getName());
083            this.serverChannel = serverChannel;
084            this.container = container;
085            this.container.addConnector(this);
086            serverChannel.setTransportChannelListener(this);
087        }
088    
089        /**
090         * @return infomation about the Broker
091         */
092        public BrokerInfo getBrokerInfo() {
093            return brokerInfo;
094        }
095    
096        /**
097         * Get a hint about the broker capacity for more messages
098         *
099         * @return percentage value (0-100) about how much capacity the
100         *         broker has
101         */
102        public int getBrokerCapacity() {
103            return container.getBroker().getRoundedCapacity();
104        }
105    
106        /**
107         * @return Get the server channel
108         */
109        public TransportServerChannel getServerChannel() {
110            return serverChannel;
111        }
112    
113        /**
114         * start the Broker
115         *
116         * @throws JMSException
117         */
118        public void start() throws JMSException {
119            this.serverChannel.start();
120            log.info("ActiveMQ connector started: " + serverChannel);
121        }
122    
123        /**
124         * Stop the Broker
125         *
126         * @throws JMSException
127         */
128        public void stop() throws JMSException {
129            this.container.removeConnector(this);
130            this.serverChannel.stop();
131            log.info("ActiveMQ connector stopped: " + serverChannel);
132        }
133    
134        /**
135         * Register a Broker Client
136         *
137         * @param client
138         * @param info   contains infomation about the Connection this Client represents
139         * @throws JMSException
140         * @throws javax.jms.InvalidClientIDException
141         *                              if the JMS client specifies an invalid or duplicate client ID.
142         * @throws JMSSecurityException if client authentication fails due to an invalid user name or password.
143         */
144        public void registerClient(BrokerClient client, ConnectionInfo info) throws JMSException {
145            this.container.registerConnection(client, info);
146        }
147    
148        /**
149         * Deregister a Broker Client
150         *
151         * @param client
152         * @param info
153         * @throws JMSException if some internal error occurs
154         */
155        public void deregisterClient(BrokerClient client, ConnectionInfo info) throws JMSException {
156            this.container.deregisterConnection(client, info);
157        }
158    
159        /**
160         * Registers a MessageConsumer
161         *
162         * @param client
163         * @param info
164         * @throws JMSException
165         * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
166         */
167        public void registerMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
168            if (info.getDestination() == null) {
169                throw new JMSException("No Destination specified on consumerInfo for client: " + client + " info: " + info);
170            }
171            this.container.registerMessageConsumer(client, info);
172    
173        }
174    
175        /**
176         * De-register a MessageConsumer from the Broker
177         *
178         * @param client
179         * @param info
180         * @throws JMSException
181         */
182        public void deregisterMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
183            this.container.deregisterMessageConsumer(client, info);
184        }
185    
186        /**
187         * Registers a MessageProducer
188         *
189         * @param client
190         * @param info
191         * @throws JMSException
192         * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
193         */
194        public void registerMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
195            this.container.registerMessageProducer(client, info);
196        }
197    
198        /**
199         * De-register a MessageProducer from the Broker
200         *
201         * @param client
202         * @param info
203         * @throws JMSException
204         */
205        public void deregisterMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
206            this.container.deregisterMessageProducer(client, info);
207        }
208    
209        /**
210         * Register a client-side Session (used for Monitoring)
211         *
212         * @param client
213         * @param info
214         * @throws JMSException
215         */
216        public void registerSession(BrokerClient client, SessionInfo info) throws JMSException {
217            this.container.registerSession(client, info);
218        }
219    
220        /**
221         * De-register a client-side Session from the Broker (used for monitoring)
222         *
223         * @param client
224         * @param info
225         * @throws JMSException
226         */
227        public void deregisterSession(BrokerClient client, SessionInfo info) throws JMSException {
228            this.container.deregisterSession(client, info);
229        }
230    
231        /**
232         * Start a transaction from the Client session
233         *
234         * @param client
235         * @param transactionId
236         * @throws JMSException
237         */
238        public void startTransaction(BrokerClient client, String transactionId) throws JMSException {
239            this.container.startTransaction(client, transactionId);
240        }
241    
242        /**
243         * Rollback a transacton
244         *
245         * @param client
246         * @param transactionId
247         * @throws JMSException
248         */
249        public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException {
250            this.container.rollbackTransaction(client, transactionId);
251        }
252    
253        /**
254         * Commit a transaction
255         *
256         * @param client
257         * @param transactionId
258         * @throws JMSException
259         */
260        public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
261            this.container.commitTransaction(client, transactionId);
262        }
263    
264        /**
265         * Send a non-transacted message to the Broker
266         *
267         * @param client
268         * @param message
269         * @throws JMSException
270         */
271        public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
272            this.container.sendMessage(client, message);
273        }
274    
275        /**
276         * Acknowledge reciept of a message
277         *
278         * @param client
279         * @param ack
280         * @throws JMSException
281         */
282        public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
283            this.container.acknowledgeMessage(client, ack);
284        }
285    
286        /**
287         * Command to delete a durable topic subscription
288         *
289         * @param client
290         * @param ds
291         * @throws JMSException
292         */
293        public void durableUnsubscribe(BrokerClient client, DurableUnsubscribe ds) throws JMSException {
294            this.container.durableUnsubscribe(client, ds);
295        }
296    
297    
298        /**
299         * @param channel - client to add
300         */
301        public void addClient(TransportChannel channel) {
302            try {
303                BrokerClient client = new BrokerClientImpl();
304                client.initialize(this, channel);
305                if (log.isDebugEnabled()) {
306                    log.debug("Starting new client: " + client);
307                }
308                channel.setServerSide(true);
309                channel.start();
310                clients.put(channel, client);
311            }
312            catch (JMSException e) {
313                log.error("Failed to add client due to: " + e, e);
314            }
315        }
316    
317        /**
318         * @param channel - client to remove
319         */
320        public void removeClient(TransportChannel channel) {
321            BrokerClient client = (BrokerClient) clients.remove(channel);
322            if (client != null) {
323                if (log.isDebugEnabled()) {
324                    log.debug("Client leaving client: " + client);
325                }
326    
327                // we may have already been closed, if not then lets simulate a normal shutdown
328                client.cleanUp();
329            }
330            else {
331                // might have got a duplicate callback
332                log.warn("No such client for channel: " + channel);
333            }
334        }
335    
336        /**
337         * @return the BrokerContainer for this Connector
338         */
339        public BrokerContainer getBrokerContainer() {
340            return this.container;
341        }
342    
343        /**
344         * Start an XA transaction.
345         *
346         * @see org.activemq.broker.BrokerConnector#startTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
347         */
348        public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
349            this.container.startTransaction(client, xid);
350        }
351    
352        /**
353         * Gets the prepared XA transactions.
354         *
355         * @see org.activemq.broker.BrokerConnector#getPreparedTransactions(org.activemq.broker.BrokerClient)
356         */
357        public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException {
358            return this.container.getPreparedTransactions(client);
359        }
360    
361        /**
362         * Prepare an XA transaction.
363         *
364         * @see org.activemq.broker.BrokerConnector#prepareTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
365         */
366        public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
367            return this.container.prepareTransaction(client, xid);
368        }
369    
370        /**
371         * Rollback an XA transaction.
372         *
373         * @see org.activemq.broker.BrokerConnector#rollbackTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
374         */
375        public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
376            this.container.rollbackTransaction(client, xid);
377        }
378    
379        /**
380         * Commit an XA transaction.
381         *
382         * @see org.activemq.broker.BrokerConnector#commitTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid, boolean)
383         */
384        public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException {
385            this.container.commitTransaction(client, xid, onePhase);
386        }
387    
388        /**
389         * @see org.activemq.broker.BrokerConnector#getResourceManagerId(org.activemq.broker.BrokerClient)
390         */
391        public String getResourceManagerId(BrokerClient client) {
392            // TODO: I think we need to return a better (more unique) RM id.
393            return getBrokerInfo().getBrokerName();
394        }
395    
396    
397        // Implementation methods
398        //-------------------------------------------------------------------------
399        /**
400         * Factory method ot create a transport channel
401         *
402         * @param bindAddress
403         * @return @throws JMSException
404         * @throws JMSException
405         */
406        protected static TransportServerChannel createTransportServerChannel(WireFormat wireFormat, String bindAddress) throws JMSException {
407            URI url;
408            try {
409                url = new URI(bindAddress);
410            }
411            catch (URISyntaxException e) {
412                JMSException jmsEx = new JMSException("Badly formated bindAddress: " + e.getMessage());
413                jmsEx.setLinkedException(e);
414                throw jmsEx;
415            }
416            return TransportServerChannelProvider.create(wireFormat, url);
417        }
418    
419    }