1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.transport.vm;
19
20 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
21 import EDU.oswego.cs.dl.util.concurrent.Channel;
22 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.codehaus.activemq.message.Packet;
26 import org.codehaus.activemq.transport.AbstractTransportChannel;
27 import org.codehaus.activemq.transport.TransportChannel;
28
29 import javax.jms.JMSException;
30
31 /***
32 * A VM implementation of a TransportChannel
33 *
34 * @version $Revision: 1.17 $
35 */
36 public class VmTransportChannel extends AbstractTransportChannel implements Runnable {
37
38 private static final Log log = LogFactory.getLog(VmTransportChannel.class);
39
40 private static final Object TERMINATE = new Object();
41
42 private Channel sendChannel;
43 private Channel receiveChannel;
44 private SynchronizedBoolean closed;
45 private SynchronizedBoolean started;
46 private Thread thread;
47 private static int lastThreadId = 0;
48
49 public VmTransportChannel(Channel sendChannel, Channel receiveChannel) {
50 this.sendChannel = sendChannel;
51 this.receiveChannel = receiveChannel;
52 closed = new SynchronizedBoolean(false);
53 started = new SynchronizedBoolean(false);
54 }
55
56 public VmTransportChannel() {
57 this(1000);
58 }
59
60 public VmTransportChannel(int capacity) {
61 this(new BoundedLinkedQueue(capacity), new BoundedLinkedQueue(capacity));
62 }
63
64 /***
65 * close the channel
66 */
67 public void stop() {
68 if (closed.commit(false, true)) {
69 super.stop();
70 try {
71
72 sendChannel.put(TERMINATE);
73
74
75 thread.join();
76 }
77 catch (Exception e) {
78 log.trace(toString() + " now closed with exception: " + e);
79 }
80 }
81 }
82
83 /***
84 * start listeneing for events
85 *
86 * @throws JMSException if an error occurs
87 */
88 public void start() throws JMSException {
89 if (started.commit(false, true)) {
90 thread = new Thread(this, "VM Transport: " + getNextThreadId());
91 thread.setDaemon(true);
92 thread.start();
93 }
94 }
95
96 static synchronized public int getNextThreadId() {
97 return lastThreadId++;
98 }
99
100 /***
101 * Asynchronously send a Packet
102 *
103 * @param packet
104 * @throws JMSException
105 */
106 public void asyncSend(Packet packet) throws JMSException {
107 while (true) {
108 try {
109 sendChannel.put(packet);
110 break;
111 }
112 catch (InterruptedException e) {
113
114 }
115 }
116 }
117
118
119 public boolean isMulticast() {
120 return false;
121 }
122
123 /***
124 * reads packets from a Socket
125 */
126 public void run() {
127 while (!closed.get()) {
128 try {
129 Object answer = receiveChannel.take();
130 if (answer == TERMINATE) {
131 log.trace("The socket peer is now closed");
132 stop();
133 return;
134 }
135 else if (answer != null) {
136 Packet packet = (Packet) answer;
137
138 if (closed.get()) {
139 break;
140 }
141 doConsumePacket(packet);
142 }
143 }
144 catch (InterruptedException e) {
145
146 }
147 }
148 }
149
150 private void doClose(Exception ex) {
151 if (!closed.get()) {
152 JMSException jmsEx = new JMSException("Error reading socket: " + ex.getMessage());
153 jmsEx.setLinkedException(ex);
154 onAsyncException(jmsEx);
155 stop();
156 }
157 }
158
159 /***
160 * pretty print for object
161 *
162 * @return String representation of this object
163 */
164 public String toString() {
165 return "VmTransportChannel: " + sendChannel;
166 }
167
168 /***
169 * Creates the server side version of this client side channel. On the server side
170 * the client's side sendChannel is the receiveChannel and vice versa
171 *
172 * @return
173 */
174 public TransportChannel createServerSide() throws JMSException {
175 return new VmTransportChannel(receiveChannel, sendChannel);
176 }
177 }