1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.transport.jgroups;
19
20 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
21 import EDU.oswego.cs.dl.util.concurrent.Executor;
22 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
23 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.codehaus.activemq.message.Packet;
27 import org.codehaus.activemq.message.WireFormat;
28 import org.codehaus.activemq.transport.AbstractTransportChannel;
29 import org.codehaus.activemq.util.JMSExceptionHelper;
30 import org.jgroups.Address;
31 import org.jgroups.Channel;
32 import org.jgroups.ChannelClosedException;
33 import org.jgroups.ChannelException;
34 import org.jgroups.ChannelNotConnectedException;
35 import org.jgroups.Message;
36 import org.jgroups.TimeoutException;
37
38 import javax.jms.JMSException;
39 import java.io.IOException;
40
41 /***
42 * A JGroups implementation of a TransportChannel
43 *
44 * @version $Revision: 1.5 $
45 */
46 public class JGroupsTransportChannel extends AbstractTransportChannel implements Runnable {
47 private static final Log log = LogFactory.getLog(JGroupsTransportChannel.class);
48
49 private Channel channel;
50 private Address localAddress = null;
51 private WireFormat wireFormat;
52 private SynchronizedBoolean closed;
53 private SynchronizedBoolean started;
54 private Object outboundLock;
55 private Executor executor;
56 private Thread thread;
57 private boolean useAsyncSend = false;
58
59 public JGroupsTransportChannel(WireFormat wireFormat, Channel channel, Executor executor) {
60 this.wireFormat = wireFormat;
61 this.channel = channel;
62 this.executor = executor;
63 this.localAddress = channel.getLocalAddress();
64
65 closed = new SynchronizedBoolean(false);
66 started = new SynchronizedBoolean(false);
67 outboundLock = new Object();
68 if (useAsyncSend) {
69 executor = new PooledExecutor(new BoundedBuffer(1000), 1);
70 }
71 }
72
73 public String toString() {
74 return "JGroupsTransportChannel: " + channel;
75 }
76
77 /***
78 * close the channel
79 */
80 public void stop() {
81 if (closed.commit(false, true)) {
82 super.stop();
83 try {
84 stopExecutor(executor);
85 channel.disconnect();
86 channel.close();
87 }
88 catch (Exception e) {
89 log.warn("Caught while closing: " + e + ". Now Closed", e);
90 }
91 }
92 }
93
94 /***
95 * start listeneing for events
96 *
97 * @throws javax.jms.JMSException if an error occurs
98 */
99 public void start() throws JMSException {
100 if (started.commit(false, true)) {
101 thread = new Thread(this, "Thread:" + toString());
102 thread.setDaemon(true);
103 thread.start();
104 }
105 }
106
107
108 /***
109 * Asynchronously send a Packet
110 *
111 * @param packet
112 * @throws javax.jms.JMSException
113 */
114 public void asyncSend(final Packet packet) throws JMSException {
115 if (executor != null) {
116 try {
117 executor.execute(new Runnable() {
118 public void run() {
119 try {
120 writePacket(packet);
121 }
122 catch (JMSException e) {
123 onAsyncException(e);
124 }
125 }
126 });
127 }
128 catch (InterruptedException e) {
129 log.info("Caught: " + e, e);
130 }
131 }
132 else {
133 writePacket(packet);
134 }
135 }
136
137
138 public boolean isMulticast() {
139 return true;
140 }
141
142 /***
143 * reads packets from a Socket
144 */
145 public void run() {
146 log.trace("JGroups consumer thread starting");
147 while (!closed.get()) {
148 try {
149 Object value = channel.receive(0L);
150 if (value instanceof Message) {
151 Message message = (Message) value;
152
153
154
155 if (!localAddress.equals(message.getSrc())) {
156 byte[] data = message.getBuffer();
157 Packet packet = wireFormat.fromBytes(data);
158 if (packet != null) {
159 doConsumePacket(packet);
160 }
161 }
162 }
163
164
165
166
167
168
169
170
171
172 }
173 catch (IOException e) {
174 doClose(e);
175 }
176 catch (ChannelClosedException e) {
177 stop();
178 }
179 catch (ChannelNotConnectedException e) {
180 doClose(e);
181 }
182 catch (TimeoutException e) {
183
184 }
185 }
186 }
187
188 /***
189 * writes the packet to the channel
190 */
191 protected void writePacket(Packet packet) throws JMSException {
192 try {
193 synchronized (outboundLock) {
194 Address dest = null;
195 Message message = new Message(dest, localAddress, wireFormat.toBytes(packet));
196 channel.send(message);
197 }
198 }
199 catch (ChannelException e) {
200 throw JMSExceptionHelper.newJMSException("writePacket failed: " + e, e);
201 }
202 catch (IOException e) {
203 throw JMSExceptionHelper.newJMSException("writePacket failed: " + e, e);
204 }
205 }
206
207
208 private void doClose(Exception ex) {
209 if (!closed.get()) {
210 onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex));
211 stop();
212 }
213 }
214 }