View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.transport.composite;
19  
20  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.codehaus.activemq.message.Packet;
24  import org.codehaus.activemq.message.PacketListener;
25  import org.codehaus.activemq.message.Receipt;
26  import org.codehaus.activemq.message.WireFormat;
27  import org.codehaus.activemq.transport.AbstractTransportChannel;
28  import org.codehaus.activemq.transport.TransportChannel;
29  import org.codehaus.activemq.transport.TransportChannelProvider;
30  
31  import javax.jms.ExceptionListener;
32  import javax.jms.JMSException;
33  import java.net.URI;
34  import java.util.ArrayList;
35  import java.util.Arrays;
36  import java.util.List;
37  
38  /***
39   * A Compsite implementation of a TransportChannel
40   *
41   * @version $Revision: 1.6 $
42   */
43  public class CompositeTransportChannel extends AbstractTransportChannel {
44      private static final Log log = LogFactory.getLog(CompositeTransportChannel.class);
45  
46      protected WireFormat wireFormat;
47      protected URI[] uris;
48      protected TransportChannel channel;
49      protected SynchronizedBoolean closed;
50      protected SynchronizedBoolean started;
51      protected int retryCount = 10;
52      protected long failureSleepTime = 500L;
53      protected URI currentURI;
54  
55  
56      public CompositeTransportChannel(WireFormat wireFormat, URI[] uris) {
57          this.wireFormat = wireFormat;
58          this.uris = uris;
59          closed = new SynchronizedBoolean(false);
60          started = new SynchronizedBoolean(false);
61      }
62  
63      public String toString() {
64          return "CompositeTransportChannel: " + channel;
65      }
66  
67      public void start() throws JMSException {
68          if (started.commit(false, true)) {
69              establishConnection();
70          }
71      }
72  
73      /***
74       * close the channel
75       */
76      public void stop() {
77          if (closed.commit(false, true)) {
78              if (channel != null) {
79                  try {
80                      channel.stop();
81                  }
82                  catch (Exception e) {
83                      log.warn("Caught while closing: " + e + ". Now Closed", e);
84                  }
85                  finally {
86                      channel = null;
87                      super.stop();
88                  }
89              }
90          }
91      }
92  
93      public Receipt send(Packet packet) throws JMSException {
94          return getChannel().send(packet);
95      }
96  
97  
98      public Receipt send(Packet packet, int timeout) throws JMSException {
99          return getChannel().send(packet, timeout);
100     }
101 
102 
103     public void asyncSend(Packet packet) throws JMSException {
104         getChannel().asyncSend(packet);
105     }
106 
107     public void setPacketListener(PacketListener listener) {
108         super.setPacketListener(listener);
109         if (channel != null) {
110             channel.setPacketListener(listener);
111         }
112     }
113 
114 
115     public void setExceptionListener(ExceptionListener listener) {
116         super.setExceptionListener(listener);
117         if (channel != null) {
118             channel.setExceptionListener(listener);
119         }
120     }
121 
122 
123     public boolean isMulticast() {
124         return false;
125     }
126 
127 
128     // Implementation methods
129     //-------------------------------------------------------------------------
130     
131     protected void establishConnection() throws JMSException {
132 
133         // lets try connect
134         boolean connected = false;
135         long time = failureSleepTime;
136         for (int i = 0; !connected && i < retryCount; i++) {
137             if (i > 0) {
138                 log.info("Sleeping for: " + time + " millis and trying again");
139                 try {
140                     Thread.sleep(time);
141                 }
142                 catch (InterruptedException e) {
143                     log.warn("Sleep interupted: " + e, e);
144                 }
145                 time *= 2;
146             }
147 
148             List list = new ArrayList(Arrays.asList(uris));
149             while (!connected && !list.isEmpty()) {
150                 URI uri = extractURI(list);
151                 try {
152                     attemptToConnect(uri);
153                     configureChannel();
154                     connected = true;
155                     currentURI = uri;
156                 }
157                 catch (JMSException e) {
158                     log.info("Could not connect to: " + uri + ". Reason: " + e);
159                 }
160             }
161 
162         }
163         if (!connected) {
164             StringBuffer buffer = new StringBuffer("");
165             for (int i = 0; i < uris.length; i++) {
166                 buffer.append(uris[i]);
167                 if (i < (uris.length - 1)) {
168                     buffer.append(",");
169                 }
170             }
171             JMSException jmsEx = new JMSException("Failed to connect to resource(s): " + buffer.toString());
172             throw jmsEx;
173         }
174 
175     }
176 
177     protected TransportChannel getChannel() throws JMSException {
178         if (channel == null) {
179             throw new JMSException("No TransportChannel connection available");
180         }
181         return channel;
182     }
183 
184     protected void configureChannel() {
185         ExceptionListener exceptionListener = getExceptionListener();
186         if (exceptionListener != null) {
187             channel.setExceptionListener(exceptionListener);
188         }
189         PacketListener packetListener = getPacketListener();
190         if (packetListener != null) {
191             channel.setPacketListener(packetListener);
192         }
193     }
194 
195 
196     protected URI extractURI(List list) throws JMSException {
197         int idx = 0;
198         if (list.size() > 1) {
199             do {
200                 idx = (int) (Math.random() * list.size());
201             }
202             while (idx < 0 || idx >= list.size());
203         }
204         return (URI) list.remove(idx);
205     }
206 
207     protected void attemptToConnect(URI uri) throws JMSException {
208         channel = TransportChannelProvider.create(wireFormat, uri);
209         if (started.get()) {
210             channel.start();
211         }
212     }
213 }