View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.transport.udp;
19  
20  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.codehaus.activemq.message.Packet;
24  import org.codehaus.activemq.message.WireFormat;
25  import org.codehaus.activemq.transport.AbstractTransportChannel;
26  import org.codehaus.activemq.util.IdGenerator;
27  
28  import javax.jms.JMSException;
29  import java.io.IOException;
30  import java.net.DatagramPacket;
31  import java.net.DatagramSocket;
32  import java.net.InetAddress;
33  import java.net.SocketTimeoutException;
34  import java.net.URI;
35  
36  /***
37   * A UDP implementation of a TransportChannel
38   *
39   * @version $Revision: 1.19 $
40   */
41  public class UdpTransportChannel extends AbstractTransportChannel implements Runnable {
42  
43      private static final int SOCKET_BUFFER_SIZE = 32 * 1024;
44      private static final int SO_TIMEOUT = 5000;
45      private static final Log log = LogFactory.getLog(UdpTransportChannel.class);
46  
47      protected DatagramSocket socket;
48      protected int port;
49      protected InetAddress inetAddress;
50  
51      private WireFormat wireFormat;
52      private SynchronizedBoolean closed;
53      private SynchronizedBoolean started;
54      private Thread thread; //need to change this - and use a thread pool
55      private IdGenerator idGenerator = new IdGenerator();
56      private Object lock;
57  
58  
59      /***
60       * Construct basic helpers
61       */
62      protected UdpTransportChannel(WireFormat wireFormat) {
63          this.wireFormat = wireFormat;
64          closed = new SynchronizedBoolean(false);
65          started = new SynchronizedBoolean(false);
66          lock = new Object();
67      }
68  
69      public UdpTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException {
70          this(wireFormat, remoteLocation, remoteLocation.getPort());
71      }
72  
73      public UdpTransportChannel(WireFormat wireFormat, URI remoteLocation, int port) throws JMSException {
74          this(wireFormat);
75          try {
76              this.port = port;
77              this.inetAddress = InetAddress.getByName(remoteLocation.getHost());
78              this.socket = createSocket(remoteLocation.getPort());
79  
80              //log.info("Creating multicast socket on port: " + port + " on
81              // host: " + remoteLocation.getHost());
82  
83              socket.setReceiveBufferSize(SOCKET_BUFFER_SIZE);
84              socket.setSendBufferSize(SOCKET_BUFFER_SIZE);
85  
86              connect();
87              
88              // now lets update the port so that sends will go elsewhere
89          }
90          catch (Exception ioe) {
91              JMSException jmsEx = new JMSException("Initialization of TransportChannel failed: " + ioe);
92              jmsEx.setLinkedException(ioe);
93              throw jmsEx;
94          }
95      }
96  
97      /***
98       * @param socket
99       * @throws JMSException
100      */
101     public UdpTransportChannel(WireFormat wireFormat, DatagramSocket socket) throws JMSException {
102         this(wireFormat);
103         this.socket = socket;
104         this.port = socket.getPort();
105         this.inetAddress = socket.getInetAddress();
106         try {
107             socket.setReceiveBufferSize(SOCKET_BUFFER_SIZE);
108             socket.setSendBufferSize(SOCKET_BUFFER_SIZE);
109         }
110         catch (IOException ioe) {
111             JMSException jmsEx = new JMSException("Initialization of TransportChannel failed");
112             jmsEx.setLinkedException(ioe);
113             throw jmsEx;
114         }
115     }
116 
117     public UdpTransportChannel(WireFormat wireFormat, DatagramSocket socket, int port) throws JMSException {
118         this(wireFormat, socket);
119         this.port = port;
120     }
121 
122     /***
123      * close the channel
124      */
125     public void stop() {
126         if (closed.commit(false, true)) {
127             super.stop();
128             try {
129                 socket.close();
130             }
131             catch (Exception e) {
132                 log.trace(toString() + " now closed");
133             }
134         }
135     }
136 
137     /***
138      * start listeneing for events
139      *
140      * @throws JMSException if an error occurs
141      */
142     public void start() throws JMSException {
143         if (started.commit(false, true)) {
144             thread = new Thread(this, "Thread:" + toString());
145             thread.setDaemon(true);
146             thread.start();
147         }
148     }
149 
150 
151     /***
152      * Asynchronously send a Packet
153      *
154      * @param packet
155      * @throws JMSException
156      */
157     public void asyncSend(Packet packet) throws JMSException {
158         try {
159             if (log.isDebugEnabled()) {
160                 log.debug("Sending packet: " + packet);
161             }
162             DatagramPacket dpacket = createDatagramPacket(packet);
163 
164 
165             // lets sync to avoid concurrent writes
166             //synchronized (lock) {
167             socket.send(dpacket);
168             //}
169         }
170         catch (IOException e) {
171             JMSException jmsEx = new JMSException("asyncSend failed " + e);
172             jmsEx.setLinkedException(e);
173             throw jmsEx;
174         }
175     }
176 
177     public boolean isMulticast() {
178         return false;
179     }
180 
181     /***
182      * reads packets from a Socket
183      */
184     public void run() {
185         while (!closed.get()) {
186             try {
187                 socket.setSoTimeout(SO_TIMEOUT);
188 
189                 DatagramPacket dpacket = createDatagramPacket();
190                 while (!socket.isClosed()) {
191                     socket.setSoTimeout(0);
192                     socket.receive(dpacket);
193                     Packet packet = wireFormat.readPacket(getClientID(), dpacket);
194                     if (packet != null) {
195                         doConsumePacket(packet);
196                     }
197                 }
198 
199                 log.trace("The socket peer is now closed");
200                 doClose(new IOException("Socket peer is now closed"));
201             }
202             catch (SocketTimeoutException ste) {
203                 //continue;
204             }
205             catch (IOException e) {
206                 doClose(e);
207             }
208         }
209     }
210 
211     /***
212      * @return
213      */
214     protected DatagramPacket createDatagramPacket() {
215         DatagramPacket answer = new DatagramPacket(new byte[SOCKET_BUFFER_SIZE], SOCKET_BUFFER_SIZE);
216         if (port >= 0) {
217             answer.setPort(port);
218         }
219         answer.setAddress(inetAddress);
220         return answer;
221     }
222 
223     protected DatagramPacket createDatagramPacket(Packet packet) throws IOException, JMSException {
224         /*if (packet instanceof ActiveMQMessage) {
225             ActiveMQMessage message = (ActiveMQMessage) packet;
226             System.out.println(">>> about to send message with clientID: " + message.getJMSClientID());
227         }*/
228         DatagramPacket answer = wireFormat.writePacket(getClientID(), packet);
229         if (port >= 0) {
230             answer.setPort(port);
231         }
232         answer.setAddress(inetAddress);
233         return answer;
234     }
235 
236     private void doClose(Exception ex) {
237         if (!closed.get()) {
238             JMSException jmsEx = new JMSException("Error reading socket: " + ex.getMessage());
239             jmsEx.setLinkedException(ex);
240             onAsyncException(jmsEx);
241             stop();
242         }
243     }
244 
245     protected void connect() throws IOException {
246         //socket.connect(inetAddress, port);
247     }
248 
249     protected DatagramSocket createSocket(int port) throws IOException {
250         return new DatagramSocket(port, inetAddress);
251     }
252 
253     /***
254      * pretty print for object
255      *
256      * @return String representation of this object
257      */
258     public String toString() {
259         return "UdpTransportChannel: " + socket;
260     }
261 }