View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.transport.jrms;
19  
20  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
21  import com.sun.multicast.reliable.RMException;
22  import com.sun.multicast.reliable.transport.RMPacketSocket;
23  import com.sun.multicast.reliable.transport.SessionDoneException;
24  import com.sun.multicast.reliable.transport.TransportProfile;
25  import com.sun.multicast.reliable.transport.lrmp.LRMPTransportProfile;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.codehaus.activemq.message.Packet;
29  import org.codehaus.activemq.message.WireFormat;
30  import org.codehaus.activemq.transport.AbstractTransportChannel;
31  import org.codehaus.activemq.util.IdGenerator;
32  
33  import javax.jms.JMSException;
34  import java.io.IOException;
35  import java.net.DatagramPacket;
36  import java.net.InetAddress;
37  import java.net.URI;
38  
39  /***
40   * A JRMS implementation of a TransportChannel
41   *
42   * @version $Revision: 1.16 $
43   */
44  public class JRMSTransportChannel extends AbstractTransportChannel implements Runnable {
45  
46      private static final int SOCKET_BUFFER_SIZE = 32 * 1024;
47      private static final Log log = LogFactory.getLog(JRMSTransportChannel.class);
48  
49      private WireFormat wireFormat;
50      private SynchronizedBoolean closed;
51      private SynchronizedBoolean started;
52      private Thread thread; //need to change this - and use a thread pool
53      // need to see our own messages
54      private RMPacketSocket socket;
55      private IdGenerator idGenerator;
56      private String channelId;
57      private int port;
58      private InetAddress inetAddress;
59      private Object lock;
60  
61      /***
62       * Construct basic helpers
63       */
64      protected JRMSTransportChannel(WireFormat wireFormat) {
65          this.wireFormat = wireFormat;
66          idGenerator = new IdGenerator();
67          channelId = idGenerator.generateId();
68          closed = new SynchronizedBoolean(false);
69          started = new SynchronizedBoolean(false);
70          lock = new Object();
71      }
72  
73      /***
74       * Connect to a remote Node - e.g. a Broker
75       *
76       * @param remoteLocation
77       * @throws JMSException
78       */
79      public JRMSTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException {
80          this(wireFormat);
81          try {
82              this.port = remoteLocation.getPort();
83              this.inetAddress = InetAddress.getByName(remoteLocation.getHost());
84              LRMPTransportProfile profile = new LRMPTransportProfile(inetAddress, port);
85              profile.setTTL((byte) 1);
86              profile.setOrdered(true);
87              this.socket = profile.createRMPacketSocket(TransportProfile.SEND_RECEIVE);
88          }
89          catch (Exception ioe) {
90              ioe.printStackTrace();
91              JMSException jmsEx = new JMSException("Initialization of JRMSTransportChannel failed: " + ioe);
92              jmsEx.setLinkedException(ioe);
93              throw jmsEx;
94          }
95      }
96  
97      /***
98       * close the channel
99       */
100     public void stop() {
101         if (closed.commit(false, true)) {
102             super.stop();
103             try {
104                 socket.close();
105             }
106             catch (Exception e) {
107                 log.trace(toString() + " now closed");
108             }
109         }
110     }
111 
112     /***
113      * start listeneing for events
114      *
115      * @throws JMSException if an error occurs
116      */
117     public void start() throws JMSException {
118         if (started.commit(false, true)) {
119             thread = new Thread(this, "Thread:" + toString());
120             thread.setDaemon(true);
121             thread.start();
122         }
123     }
124 
125     /***
126      * Asynchronously send a Packet
127      *
128      * @param packet
129      * @throws JMSException
130      */
131     public void asyncSend(Packet packet) throws JMSException {
132         try {
133             DatagramPacket dpacket = createDatagramPacket(packet);
134 
135             // lets sync to avoid concurrent writes
136             //synchronized (lock) {
137             socket.send(dpacket);
138             //}
139         }
140         catch (RMException rme) {
141             JMSException jmsEx = new JMSException("syncSend failed " + rme.getMessage());
142             jmsEx.setLinkedException(rme);
143             throw jmsEx;
144         }
145         catch (IOException e) {
146             JMSException jmsEx = new JMSException("asyncSend failed " + e.getMessage());
147             jmsEx.setLinkedException(e);
148             throw jmsEx;
149         }
150     }
151 
152 
153     public boolean isMulticast() {
154         return true;
155     }
156 
157     /***
158      * reads packets from a Socket
159      */
160     public void run() {
161         try {
162             while (!closed.get()) {
163                 DatagramPacket dpacket = socket.receive();
164                 Packet packet = wireFormat.readPacket(channelId, dpacket);
165                 if (packet != null) {
166                     doConsumePacket(packet);
167                 }
168             }
169             log.trace("The socket peer is now closed");
170             //doClose(new IOException("Socket peer is now closed"));
171             stop();
172         }
173         catch (SessionDoneException e) {
174             // this isn't really an exception, it just indicates
175             // that the socket has closed normally
176             log.trace("Session completed", e);
177             stop();
178         }
179         catch (RMException ste) {
180             doClose(ste);
181         }
182         catch (IOException e) {
183             doClose(e);
184         }
185     }
186 
187     protected DatagramPacket createDatagramPacket() {
188         DatagramPacket answer = new DatagramPacket(new byte[SOCKET_BUFFER_SIZE], SOCKET_BUFFER_SIZE);
189         answer.setPort(port);
190         answer.setAddress(inetAddress);
191         return answer;
192     }
193 
194     protected DatagramPacket createDatagramPacket(Packet packet) throws IOException, JMSException {
195         DatagramPacket answer = wireFormat.writePacket(channelId, packet);
196         answer.setPort(port);
197         answer.setAddress(inetAddress);
198         return answer;
199     }
200 
201     private void doClose(Exception ex) {
202         if (!closed.get()) {
203             JMSException jmsEx = new JMSException("Error reading socket: " + ex);
204             jmsEx.setLinkedException(ex);
205             onAsyncException(jmsEx);
206             stop();
207         }
208     }
209 
210     /***
211      * pretty print for object
212      *
213      * @return String representation of this object
214      */
215     public String toString() {
216         return "JRMSTransportChannel: " + socket;
217     }
218 }