View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.transport.reliable;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.codehaus.activemq.message.Packet;
24  import org.codehaus.activemq.message.PacketListener;
25  import org.codehaus.activemq.message.Receipt;
26  import org.codehaus.activemq.message.WireFormat;
27  import org.codehaus.activemq.transport.TransportChannel;
28  import org.codehaus.activemq.transport.TransportStatusEvent;
29  import org.codehaus.activemq.transport.composite.CompositeTransportChannel;
30  
31  import javax.jms.ExceptionListener;
32  import javax.jms.JMSException;
33  import java.net.URI;
34  import java.util.LinkedList;
35  import java.util.List;
36  
37  /***
38   * A Compsite implementation of a TransportChannel
39   *
40   * @version $Revision: 1.4 $
41   */
42  public class ReliableTransportChannel extends CompositeTransportChannel implements PacketListener, ExceptionListener {
43      private static final Log log = LogFactory.getLog(ReliableTransportChannel.class);
44      private Object lock = new Object();
45      private LinkedList packetList = new LinkedList();
46      private boolean cacheMessagesForFailover;
47  
48      /***
49       * Constructor for ReliableTransportChannel
50       *
51       * @param wireFormat
52       * @param uris
53       * @throws JMSException
54       */
55      public ReliableTransportChannel(WireFormat wireFormat, URI[] uris) throws JMSException {
56          super(wireFormat, uris);
57          establishConnection();
58      }
59  
60      /***
61       * @return pretty print for this
62       */
63      public String toString() {
64          return "ReliableTransportChannel: " + channel;
65      }
66  
67      /***
68       * start the connection
69       *
70       * @throws JMSException
71       */
72      public void start() throws JMSException {
73          if (started.commit(false, true)) {
74              if (channel != null) {
75                  channel.start();
76              }
77          }
78      }
79  
80      /***
81       * @param packet
82       * @param timeout
83       * @return @throws JMSException
84       */
85      public Receipt send(Packet packet, int timeout) throws JMSException {
86          TransportChannel tc = this.channel;
87          while (true) {
88              try {
89                  return tc.send(packet, timeout);
90              }
91              catch (JMSException jmsEx) {
92                  doReconnect(tc);
93              }
94          }
95      }
96  
97      public void asyncSend(Packet packet) throws JMSException {
98          TransportChannel tc = this.channel;
99          while (true) {
100             try {
101                 tc.asyncSend(packet);
102                 break;
103             }
104             catch (JMSException jmsEx) {
105                 doReconnect(tc);
106             }
107         }
108     }
109 
110     protected void configureChannel() {
111         channel.setPacketListener(this);
112         channel.setExceptionListener(this);
113     }
114 
115     protected URI extractURI(List list) throws JMSException {
116         int idx = 0;
117         if (list.size() > 1) {
118             SMLCGRandom rand = new SMLCGRandom();
119             do {
120                 idx = (int) (rand.nextDouble() * list.size());
121             }
122             while (idx < 0 || idx >= list.size());
123         }
124         return (URI) list.remove(idx);
125     }
126 
127     /***
128      * consume a packet from the enbedded channel
129      *
130      * @param packet to consume
131      */
132     public void consume(Packet packet) {
133         //do processing
134         //avoid a lock
135         PacketListener listener = getPacketListener();
136         if (listener != null) {
137             listener.consume(packet);
138         }
139     }
140 
141     /***
142      * handle exception from the embedded channel
143      *
144      * @param jmsEx
145      */
146     public void onException(JMSException jmsEx) {
147         TransportChannel tc = this.channel;
148         try {
149             doReconnect(tc);
150         }
151         catch (JMSException ex) {
152             ex.setLinkedException(jmsEx);
153             fireException(ex);
154         }
155     }
156 
157     public void stop() {
158         super.stop();
159         fireStatusEvent(super.currentURI, TransportStatusEvent.DISCONNECTED);
160     }
161 
162     /***
163      * Fire a JMSException to the exception listener
164      *
165      * @param jmsEx
166      */
167     protected void fireException(JMSException jmsEx) {
168         ExceptionListener listener = getExceptionListener();
169         if (listener != null) {
170             listener.onException(jmsEx);
171         }
172     }
173 
174     protected void doReconnect(TransportChannel currentChannel) throws JMSException {
175         if (!closed.get()) {
176             synchronized (lock) {
177                 //Loss of connectivity can be signalled from more than one
178                 //thread - hence the check here - we want to avoid doing it more than once
179                 if (this.channel == currentChannel) {
180                     fireStatusEvent(super.currentURI, TransportStatusEvent.DISCONNECTED);
181                     try {
182                         establishConnection();
183                     }
184                     catch (JMSException jmsEx) {
185                         fireStatusEvent(super.currentURI, TransportStatusEvent.FAILED);
186                         throw jmsEx;
187                     }
188                     fireStatusEvent(super.currentURI, TransportStatusEvent.RECONNECTED);
189                 }
190             }
191         }
192     }
193 }