View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Hiram Chirino
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activeio.adapter;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  
23  import org.activeio.AsynchChannel;
24  import org.activeio.AsynchChannelListener;
25  import org.activeio.Packet;
26  import org.activeio.SynchChannel;
27  
28  import EDU.oswego.cs.dl.util.concurrent.Channel;
29  import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
30  
31  /***
32   * Adapts a {@see org.activeio.AsynchChannel} so that it provides an 
33   * {@see org.activeio.SynchChannel} interface.  
34   * 
35   * This object buffers asynchronous messages from the {@see org.activeio.AsynchChannel} 
36   * and buffers them in a {@see EDU.oswego.cs.dl.util.concurrent.Channel} util the client receives them.
37   * 
38   * @version $Revision$
39   */
40  final public class AsynchToSynchChannelAdapter implements SynchChannel, AsynchChannelListener {
41  
42      private final AsynchChannel asynchChannel;    
43      private final Channel buffer;
44  
45      static public SynchChannel adapt(org.activeio.Channel channel) {
46          return adapt(channel, new LinkedQueue());
47      }
48  
49      static public SynchChannel adapt(org.activeio.Channel channel, Channel upPacketChannel) {
50  
51          // It might not need adapting
52          if( channel instanceof SynchChannel ) {
53              return (SynchChannel) channel;
54          }
55  
56          // Can we just just undo the adaptor
57          if( channel.getClass() == SynchToAsynchChannelAdapter.class ) {
58              return ((SynchToAsynchChannelAdapter)channel).getSynchChannel();
59          }
60          
61          return new AsynchToSynchChannelAdapter((AsynchChannel)channel, upPacketChannel);        
62      }
63      
64      /***
65       * @deprecated {@see #adapt(AsynchChannel)}
66       */
67      public AsynchToSynchChannelAdapter(AsynchChannel asynchChannel) {
68          this(asynchChannel, new LinkedQueue());
69      }
70      
71      /***
72       * @deprecated {@see #adapt(AsynchChannel, Channel)}
73       */
74      public AsynchToSynchChannelAdapter(AsynchChannel asynchChannel, Channel upPacketChannel){
75          this.asynchChannel = asynchChannel;
76          this.asynchChannel.setAsynchChannelListener(this);
77          this.buffer=upPacketChannel;
78      }
79  
80      /***
81       * @see org.activeio.Channel#write(org.activeio.channel.Packet)
82       */
83      public void write(org.activeio.Packet packet) throws IOException {
84          asynchChannel.write(packet);
85      }
86  
87      /***
88       * @see org.activeio.Channel#flush()
89       */
90      public void flush() throws IOException {
91          asynchChannel.flush();
92      }
93  
94      /***
95       * @see org.activeio.SynchChannel#read(long)
96       */
97      public Packet read(long timeout) throws IOException {
98          try {
99              
100             Object o;
101             if( timeout == NO_WAIT_TIMEOUT ) {
102                 o = buffer.poll(0);
103             } else if( timeout == WAIT_FOREVER_TIMEOUT ) {
104                 o = buffer.take();            
105             } else {
106                 o = buffer.poll(timeout);                        
107             }
108             
109             if( o == null )
110                 return null;
111             
112             if( o instanceof Packet )
113                 return (Packet)o;     
114             
115             Throwable e = (Throwable)o;
116             throw (IOException)new IOException("Asynch error occured: "+e).initCause(e);
117             
118         } catch (InterruptedException e) {
119             throw new InterruptedIOException(e.getMessage());
120         }
121     }
122 
123     /***
124      * @see org.activeio.Disposable#dispose()
125      */
126     public void dispose() {
127         asynchChannel.dispose();
128     }
129 
130     /***
131      * @see org.activeio.Service#start()
132      */
133     public void start() throws IOException {
134         asynchChannel.start();
135     }
136 
137     /***
138      * @see org.activeio.Service#stop(long)
139      */
140     public void stop(long timeout) throws IOException {
141         asynchChannel.stop(timeout);
142     }
143 
144     /***
145      * @see org.activeio.AsynchChannelListener#onPacket(org.activeio.channel.Packet)
146      */
147     public void onPacket(Packet packet) {
148         try {
149             buffer.put(packet);
150         } catch (InterruptedException e) {
151             Thread.currentThread().interrupt();
152         }        
153     }
154 
155     /***
156      * @see org.activeio.AsynchChannelListener#onPacketError(org.activeio.channel.ChannelException)
157      */
158     public void onPacketError(IOException error) {
159         try {
160             buffer.put(error);
161         } catch (InterruptedException e) {
162             Thread.currentThread().interrupt();
163         }        
164     }
165 
166     public Object narrow(Class target) {
167         if( target.isAssignableFrom(getClass()) ) {
168             return this;
169         }
170         return asynchChannel.narrow(target);
171     }    
172 
173     public AsynchChannel getAsynchChannel() {
174         return asynchChannel;
175     }
176     
177     public String toString() {
178         return asynchChannel.toString();
179     }
180 }