1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.transport.composite;
19
20 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.codehaus.activemq.message.Packet;
24 import org.codehaus.activemq.message.PacketListener;
25 import org.codehaus.activemq.message.Receipt;
26 import org.codehaus.activemq.message.WireFormat;
27 import org.codehaus.activemq.transport.AbstractTransportChannel;
28 import org.codehaus.activemq.transport.TransportChannel;
29 import org.codehaus.activemq.transport.TransportChannelProvider;
30
31 import javax.jms.ExceptionListener;
32 import javax.jms.JMSException;
33 import java.net.URI;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.List;
37
38 /***
39 * A Compsite implementation of a TransportChannel
40 *
41 * @version $Revision: 1.6 $
42 */
43 public class CompositeTransportChannel extends AbstractTransportChannel {
44 private static final Log log = LogFactory.getLog(CompositeTransportChannel.class);
45
46 protected WireFormat wireFormat;
47 protected URI[] uris;
48 protected TransportChannel channel;
49 protected SynchronizedBoolean closed;
50 protected SynchronizedBoolean started;
51 protected int retryCount = 10;
52 protected long failureSleepTime = 500L;
53 protected URI currentURI;
54
55
56 public CompositeTransportChannel(WireFormat wireFormat, URI[] uris) {
57 this.wireFormat = wireFormat;
58 this.uris = uris;
59 closed = new SynchronizedBoolean(false);
60 started = new SynchronizedBoolean(false);
61 }
62
63 public String toString() {
64 return "CompositeTransportChannel: " + channel;
65 }
66
67 public void start() throws JMSException {
68 if (started.commit(false, true)) {
69 establishConnection();
70 }
71 }
72
73 /***
74 * close the channel
75 */
76 public void stop() {
77 if (closed.commit(false, true)) {
78 if (channel != null) {
79 try {
80 channel.stop();
81 }
82 catch (Exception e) {
83 log.warn("Caught while closing: " + e + ". Now Closed", e);
84 }
85 finally {
86 channel = null;
87 super.stop();
88 }
89 }
90 }
91 }
92
93 public Receipt send(Packet packet) throws JMSException {
94 return getChannel().send(packet);
95 }
96
97
98 public Receipt send(Packet packet, int timeout) throws JMSException {
99 return getChannel().send(packet, timeout);
100 }
101
102
103 public void asyncSend(Packet packet) throws JMSException {
104 getChannel().asyncSend(packet);
105 }
106
107 public void setPacketListener(PacketListener listener) {
108 super.setPacketListener(listener);
109 if (channel != null) {
110 channel.setPacketListener(listener);
111 }
112 }
113
114
115 public void setExceptionListener(ExceptionListener listener) {
116 super.setExceptionListener(listener);
117 if (channel != null) {
118 channel.setExceptionListener(listener);
119 }
120 }
121
122
123 public boolean isMulticast() {
124 return false;
125 }
126
127
128
129
130
131 protected void establishConnection() throws JMSException {
132
133
134 boolean connected = false;
135 long time = failureSleepTime;
136 for (int i = 0; !connected && i < retryCount; i++) {
137 if (i > 0) {
138 log.info("Sleeping for: " + time + " millis and trying again");
139 try {
140 Thread.sleep(time);
141 }
142 catch (InterruptedException e) {
143 log.warn("Sleep interupted: " + e, e);
144 }
145 time *= 2;
146 }
147
148 List list = new ArrayList(Arrays.asList(uris));
149 while (!connected && !list.isEmpty()) {
150 URI uri = extractURI(list);
151 try {
152 attemptToConnect(uri);
153 configureChannel();
154 connected = true;
155 currentURI = uri;
156 }
157 catch (JMSException e) {
158 log.info("Could not connect to: " + uri + ". Reason: " + e);
159 }
160 }
161
162 }
163 if (!connected) {
164 StringBuffer buffer = new StringBuffer("");
165 for (int i = 0; i < uris.length; i++) {
166 buffer.append(uris[i]);
167 if (i < (uris.length - 1)) {
168 buffer.append(",");
169 }
170 }
171 JMSException jmsEx = new JMSException("Failed to connect to resource(s): " + buffer.toString());
172 throw jmsEx;
173 }
174
175 }
176
177 protected TransportChannel getChannel() throws JMSException {
178 if (channel == null) {
179 throw new JMSException("No TransportChannel connection available");
180 }
181 return channel;
182 }
183
184 protected void configureChannel() {
185 ExceptionListener exceptionListener = getExceptionListener();
186 if (exceptionListener != null) {
187 channel.setExceptionListener(exceptionListener);
188 }
189 PacketListener packetListener = getPacketListener();
190 if (packetListener != null) {
191 channel.setPacketListener(packetListener);
192 }
193 }
194
195
196 protected URI extractURI(List list) throws JMSException {
197 int idx = 0;
198 if (list.size() > 1) {
199 do {
200 idx = (int) (Math.random() * list.size());
201 }
202 while (idx < 0 || idx >= list.size());
203 }
204 return (URI) list.remove(idx);
205 }
206
207 protected void attemptToConnect(URI uri) throws JMSException {
208 channel = TransportChannelProvider.create(wireFormat, uri);
209 if (started.get()) {
210 channel.start();
211 }
212 }
213 }