View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.transport.jgroups;
19  
20  import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
21  import EDU.oswego.cs.dl.util.concurrent.Executor;
22  import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
23  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.codehaus.activemq.message.Packet;
27  import org.codehaus.activemq.message.WireFormat;
28  import org.codehaus.activemq.transport.AbstractTransportChannel;
29  import org.codehaus.activemq.util.JMSExceptionHelper;
30  import org.jgroups.Address;
31  import org.jgroups.Channel;
32  import org.jgroups.ChannelClosedException;
33  import org.jgroups.ChannelException;
34  import org.jgroups.ChannelNotConnectedException;
35  import org.jgroups.Message;
36  import org.jgroups.TimeoutException;
37  
38  import javax.jms.JMSException;
39  import java.io.IOException;
40  
41  /***
42   * A JGroups implementation of a TransportChannel
43   *
44   * @version $Revision: 1.5 $
45   */
46  public class JGroupsTransportChannel extends AbstractTransportChannel implements Runnable {
47      private static final Log log = LogFactory.getLog(JGroupsTransportChannel.class);
48  
49      private Channel channel;
50      private Address localAddress = null;
51      private WireFormat wireFormat;
52      private SynchronizedBoolean closed;
53      private SynchronizedBoolean started;
54      private Object outboundLock;
55      private Executor executor;
56      private Thread thread; //need to change this - and use a thread pool
57      private boolean useAsyncSend = false;
58  
59      public JGroupsTransportChannel(WireFormat wireFormat, Channel channel, Executor executor) {
60          this.wireFormat = wireFormat;
61          this.channel = channel;
62          this.executor = executor;
63          this.localAddress = channel.getLocalAddress();
64  
65          closed = new SynchronizedBoolean(false);
66          started = new SynchronizedBoolean(false);
67          outboundLock = new Object();
68          if (useAsyncSend) {
69              executor = new PooledExecutor(new BoundedBuffer(1000), 1);
70          }
71      }
72  
73      public String toString() {
74          return "JGroupsTransportChannel: " + channel;
75      }
76  
77      /***
78       * close the channel
79       */
80      public void stop() {
81          if (closed.commit(false, true)) {
82              super.stop();
83              try {
84                  stopExecutor(executor);
85                  channel.disconnect();
86                  channel.close();
87              }
88              catch (Exception e) {
89                  log.warn("Caught while closing: " + e + ". Now Closed", e);
90              }
91          }
92      }
93  
94      /***
95       * start listeneing for events
96       *
97       * @throws javax.jms.JMSException if an error occurs
98       */
99      public void start() throws JMSException {
100         if (started.commit(false, true)) {
101             thread = new Thread(this, "Thread:" + toString());
102             thread.setDaemon(true);
103             thread.start();
104         }
105     }
106 
107 
108     /***
109      * Asynchronously send a Packet
110      *
111      * @param packet
112      * @throws javax.jms.JMSException
113      */
114     public void asyncSend(final Packet packet) throws JMSException {
115         if (executor != null) {
116             try {
117                 executor.execute(new Runnable() {
118                     public void run() {
119                         try {
120                             writePacket(packet);
121                         }
122                         catch (JMSException e) {
123                             onAsyncException(e);
124                         }
125                     }
126                 });
127             }
128             catch (InterruptedException e) {
129                 log.info("Caught: " + e, e);
130             }
131         }
132         else {
133             writePacket(packet);
134         }
135     }
136 
137 
138     public boolean isMulticast() {
139         return true;
140     }
141 
142     /***
143      * reads packets from a Socket
144      */
145     public void run() {
146         log.trace("JGroups consumer thread starting");
147         while (!closed.get()) {
148             try {
149                 Object value = channel.receive(0L);
150                 if (value instanceof Message) {
151                     Message message = (Message) value;
152 
153                     // lets discard messages coming from the local address
154                     // to avoid infinite loops when used with the JMS broker
155                     if (!localAddress.equals(message.getSrc())) {
156                         byte[] data = message.getBuffer();
157                         Packet packet = wireFormat.fromBytes(data);
158                         if (packet != null) {
159                             doConsumePacket(packet);
160                         }
161                     }
162                 }
163                 /*
164                 else {
165                     String type = "";
166                     if (value != null) {
167                         type = " of type: " + value.getClass();
168                     }
169                     log.warn("Expected instanceof Message but received: " + value + type);
170                 }
171                 */
172             }
173             catch (IOException e) {
174                 doClose(e);
175             }
176             catch (ChannelClosedException e) {
177                 stop();
178             }
179             catch (ChannelNotConnectedException e) {
180                 doClose(e);
181             }
182             catch (TimeoutException e) {
183                 // ignore timeouts
184             }
185         }
186     }
187 
188     /***
189      * writes the packet to the channel
190      */
191     protected void writePacket(Packet packet) throws JMSException {
192         try {
193             synchronized (outboundLock) {
194                 Address dest = null;
195                 Message message = new Message(dest, localAddress, wireFormat.toBytes(packet));
196                 channel.send(message);
197             }
198         }
199         catch (ChannelException e) {
200             throw JMSExceptionHelper.newJMSException("writePacket failed: " + e, e);
201         }
202         catch (IOException e) {
203             throw JMSExceptionHelper.newJMSException("writePacket failed: " + e, e);
204         }
205     }
206 
207 
208     private void doClose(Exception ex) {
209         if (!closed.get()) {
210             onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex));
211             stop();
212         }
213     }
214 }