1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activeio.adapter;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22
23 import org.activeio.AsynchChannel;
24 import org.activeio.AsynchChannelListener;
25 import org.activeio.Packet;
26 import org.activeio.SynchChannel;
27
28 import EDU.oswego.cs.dl.util.concurrent.Channel;
29 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
30
31 /***
32 * Adapts a {@see org.activeio.AsynchChannel} so that it provides an
33 * {@see org.activeio.SynchChannel} interface.
34 *
35 * This object buffers asynchronous messages from the {@see org.activeio.AsynchChannel}
36 * and buffers them in a {@see EDU.oswego.cs.dl.util.concurrent.Channel} util the client receives them.
37 *
38 * @version $Revision$
39 */
40 final public class AsynchToSynchChannelAdapter implements SynchChannel, AsynchChannelListener {
41
42 private final AsynchChannel asynchChannel;
43 private final Channel buffer;
44
45 static public SynchChannel adapt(org.activeio.Channel channel) {
46 return adapt(channel, new LinkedQueue());
47 }
48
49 static public SynchChannel adapt(org.activeio.Channel channel, Channel upPacketChannel) {
50
51
52 if( channel instanceof SynchChannel ) {
53 return (SynchChannel) channel;
54 }
55
56
57 if( channel.getClass() == SynchToAsynchChannelAdapter.class ) {
58 return ((SynchToAsynchChannelAdapter)channel).getSynchChannel();
59 }
60
61 return new AsynchToSynchChannelAdapter((AsynchChannel)channel, upPacketChannel);
62 }
63
64 /***
65 * @deprecated {@see #adapt(AsynchChannel)}
66 */
67 public AsynchToSynchChannelAdapter(AsynchChannel asynchChannel) {
68 this(asynchChannel, new LinkedQueue());
69 }
70
71 /***
72 * @deprecated {@see #adapt(AsynchChannel, Channel)}
73 */
74 public AsynchToSynchChannelAdapter(AsynchChannel asynchChannel, Channel upPacketChannel){
75 this.asynchChannel = asynchChannel;
76 this.asynchChannel.setAsynchChannelListener(this);
77 this.buffer=upPacketChannel;
78 }
79
80 /***
81 * @see org.activeio.Channel#write(org.activeio.channel.Packet)
82 */
83 public void write(org.activeio.Packet packet) throws IOException {
84 asynchChannel.write(packet);
85 }
86
87 /***
88 * @see org.activeio.Channel#flush()
89 */
90 public void flush() throws IOException {
91 asynchChannel.flush();
92 }
93
94 /***
95 * @see org.activeio.SynchChannel#read(long)
96 */
97 public Packet read(long timeout) throws IOException {
98 try {
99
100 Object o;
101 if( timeout == NO_WAIT_TIMEOUT ) {
102 o = buffer.poll(0);
103 } else if( timeout == WAIT_FOREVER_TIMEOUT ) {
104 o = buffer.take();
105 } else {
106 o = buffer.poll(timeout);
107 }
108
109 if( o == null )
110 return null;
111
112 if( o instanceof Packet )
113 return (Packet)o;
114
115 Throwable e = (Throwable)o;
116 throw (IOException)new IOException("Asynch error occured: "+e).initCause(e);
117
118 } catch (InterruptedException e) {
119 throw new InterruptedIOException(e.getMessage());
120 }
121 }
122
123 /***
124 * @see org.activeio.Disposable#dispose()
125 */
126 public void dispose() {
127 asynchChannel.dispose();
128 }
129
130 /***
131 * @see org.activeio.Service#start()
132 */
133 public void start() throws IOException {
134 asynchChannel.start();
135 }
136
137 /***
138 * @see org.activeio.Service#stop(long)
139 */
140 public void stop(long timeout) throws IOException {
141 asynchChannel.stop(timeout);
142 }
143
144 /***
145 * @see org.activeio.AsynchChannelListener#onPacket(org.activeio.channel.Packet)
146 */
147 public void onPacket(Packet packet) {
148 try {
149 buffer.put(packet);
150 } catch (InterruptedException e) {
151 Thread.currentThread().interrupt();
152 }
153 }
154
155 /***
156 * @see org.activeio.AsynchChannelListener#onPacketError(org.activeio.channel.ChannelException)
157 */
158 public void onPacketError(IOException error) {
159 try {
160 buffer.put(error);
161 } catch (InterruptedException e) {
162 Thread.currentThread().interrupt();
163 }
164 }
165
166 public Object narrow(Class target) {
167 if( target.isAssignableFrom(getClass()) ) {
168 return this;
169 }
170 return asynchChannel.narrow(target);
171 }
172
173 public AsynchChannel getAsynchChannel() {
174 return asynchChannel;
175 }
176
177 public String toString() {
178 return asynchChannel.toString();
179 }
180 }