1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.transport.reliable;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.codehaus.activemq.message.Packet;
24 import org.codehaus.activemq.message.PacketListener;
25 import org.codehaus.activemq.message.Receipt;
26 import org.codehaus.activemq.message.WireFormat;
27 import org.codehaus.activemq.transport.TransportChannel;
28 import org.codehaus.activemq.transport.TransportStatusEvent;
29 import org.codehaus.activemq.transport.composite.CompositeTransportChannel;
30
31 import javax.jms.ExceptionListener;
32 import javax.jms.JMSException;
33 import java.net.URI;
34 import java.util.LinkedList;
35 import java.util.List;
36
37 /***
38 * A Compsite implementation of a TransportChannel
39 *
40 * @version $Revision: 1.4 $
41 */
42 public class ReliableTransportChannel extends CompositeTransportChannel implements PacketListener, ExceptionListener {
43 private static final Log log = LogFactory.getLog(ReliableTransportChannel.class);
44 private Object lock = new Object();
45 private LinkedList packetList = new LinkedList();
46 private boolean cacheMessagesForFailover;
47
48 /***
49 * Constructor for ReliableTransportChannel
50 *
51 * @param wireFormat
52 * @param uris
53 * @throws JMSException
54 */
55 public ReliableTransportChannel(WireFormat wireFormat, URI[] uris) throws JMSException {
56 super(wireFormat, uris);
57 establishConnection();
58 }
59
60 /***
61 * @return pretty print for this
62 */
63 public String toString() {
64 return "ReliableTransportChannel: " + channel;
65 }
66
67 /***
68 * start the connection
69 *
70 * @throws JMSException
71 */
72 public void start() throws JMSException {
73 if (started.commit(false, true)) {
74 if (channel != null) {
75 channel.start();
76 }
77 }
78 }
79
80 /***
81 * @param packet
82 * @param timeout
83 * @return @throws JMSException
84 */
85 public Receipt send(Packet packet, int timeout) throws JMSException {
86 TransportChannel tc = this.channel;
87 while (true) {
88 try {
89 return tc.send(packet, timeout);
90 }
91 catch (JMSException jmsEx) {
92 doReconnect(tc);
93 }
94 }
95 }
96
97 public void asyncSend(Packet packet) throws JMSException {
98 TransportChannel tc = this.channel;
99 while (true) {
100 try {
101 tc.asyncSend(packet);
102 break;
103 }
104 catch (JMSException jmsEx) {
105 doReconnect(tc);
106 }
107 }
108 }
109
110 protected void configureChannel() {
111 channel.setPacketListener(this);
112 channel.setExceptionListener(this);
113 }
114
115 protected URI extractURI(List list) throws JMSException {
116 int idx = 0;
117 if (list.size() > 1) {
118 SMLCGRandom rand = new SMLCGRandom();
119 do {
120 idx = (int) (rand.nextDouble() * list.size());
121 }
122 while (idx < 0 || idx >= list.size());
123 }
124 return (URI) list.remove(idx);
125 }
126
127 /***
128 * consume a packet from the enbedded channel
129 *
130 * @param packet to consume
131 */
132 public void consume(Packet packet) {
133
134
135 PacketListener listener = getPacketListener();
136 if (listener != null) {
137 listener.consume(packet);
138 }
139 }
140
141 /***
142 * handle exception from the embedded channel
143 *
144 * @param jmsEx
145 */
146 public void onException(JMSException jmsEx) {
147 TransportChannel tc = this.channel;
148 try {
149 doReconnect(tc);
150 }
151 catch (JMSException ex) {
152 ex.setLinkedException(jmsEx);
153 fireException(ex);
154 }
155 }
156
157 public void stop() {
158 super.stop();
159 fireStatusEvent(super.currentURI, TransportStatusEvent.DISCONNECTED);
160 }
161
162 /***
163 * Fire a JMSException to the exception listener
164 *
165 * @param jmsEx
166 */
167 protected void fireException(JMSException jmsEx) {
168 ExceptionListener listener = getExceptionListener();
169 if (listener != null) {
170 listener.onException(jmsEx);
171 }
172 }
173
174 protected void doReconnect(TransportChannel currentChannel) throws JMSException {
175 if (!closed.get()) {
176 synchronized (lock) {
177
178
179 if (this.channel == currentChannel) {
180 fireStatusEvent(super.currentURI, TransportStatusEvent.DISCONNECTED);
181 try {
182 establishConnection();
183 }
184 catch (JMSException jmsEx) {
185 fireStatusEvent(super.currentURI, TransportStatusEvent.FAILED);
186 throw jmsEx;
187 }
188 fireStatusEvent(super.currentURI, TransportStatusEvent.RECONNECTED);
189 }
190 }
191 }
192 }
193 }