View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activeio.adapter;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  
23  import org.activeio.AsynchChannel;
24  import org.activeio.AsynchChannelListener;
25  import org.activeio.Channel;
26  import org.activeio.ChannelFactory;
27  import org.activeio.Packet;
28  import org.activeio.Service;
29  import org.activeio.SynchChannel;
30  import org.activeio.packet.EOSPacket;
31  
32  import EDU.oswego.cs.dl.util.concurrent.Executor;
33  import EDU.oswego.cs.dl.util.concurrent.Latch;
34  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
35  
36  /***
37   * Adapts a {@see org.activeio.SynchChannel} so that it provides an 
38   * {@see org.activeio.AsynchChannel} interface.  When this channel
39   * is started, a background thread is used to poll the {@see org.activeio.SynchChannel}
40   *  for packets comming up the channel which are then delivered to the 
41   * {@see org.activeio.ChannelConsumer}.
42   * 
43   * @version $Revision$
44   */
45  final public class SynchToAsynchChannelAdapter implements AsynchChannel, Runnable {
46  
47      private final SynchronizedBoolean running = new SynchronizedBoolean(false);
48      private final SynchChannel synchChannel;
49      private final Executor executor;
50      private AsynchChannelListener channelListener;
51      private Latch doneLatch;
52      
53      
54      static public AsynchChannel adapt(Channel channel) {
55          return adapt(channel, ChannelFactory.DEFAULT_EXECUTOR);
56      }
57  
58      static public AsynchChannel adapt(Channel channel, Executor executor) {
59          
60          // It might not need adapting
61          if( channel instanceof AsynchChannel ) {
62              return (AsynchChannel) channel;
63          }
64          
65          // Can we just just undo the adaptor
66          if( channel.getClass() == SynchToAsynchChannelAdapter.class ) {
67              return ((AsynchToSynchChannelAdapter)channel).getAsynchChannel();
68          }
69          
70          return new SynchToAsynchChannelAdapter((SynchChannel) channel, executor);
71          
72      }
73  
74      
75      /***
76       * @deprecated {@see #adapt(SynchChannel)}
77       */
78      public SynchToAsynchChannelAdapter(SynchChannel synchChannel) {
79          this(synchChannel, ChannelFactory.DEFAULT_EXECUTOR);
80      }
81  
82      /***
83       * @deprecated {@see #adapt(SynchChannel, Executor)}
84       */
85      public SynchToAsynchChannelAdapter(SynchChannel synchChannel, Executor executor) {
86          this.synchChannel = synchChannel;
87          this.executor = executor;
88      }
89  
90      synchronized public void start() throws IOException {
91          if (running.commit(false, true)) {
92              
93              if (channelListener == null)
94                  throw new IllegalStateException("UpPacketListener must be set before object can be started.");
95              
96              synchChannel.start();
97  
98              try {
99                  doneLatch = new Latch();
100                 executor.execute(this);
101             } catch (InterruptedException e) {
102                 throw new InterruptedIOException(e.getMessage());
103             }
104 
105         }
106     }
107 
108     synchronized public void stop(long timeout) throws IOException {
109         if (running.commit(true, false)) {
110             try {
111                 if( timeout == NO_WAIT_TIMEOUT ) {
112                     synchChannel.stop(NO_WAIT_TIMEOUT);
113                 } else if( timeout == WAIT_FOREVER_TIMEOUT ) {
114                     doneLatch.acquire();
115                     synchChannel.stop(WAIT_FOREVER_TIMEOUT);
116                 } else {
117                     
118                     long start = System.currentTimeMillis();
119                     if( doneLatch.attempt(timeout) ) {
120                         timeout -= (System.currentTimeMillis() - start);
121                     } else {
122                         timeout=0;
123                     }
124                     
125                     if( timeout <= 0 ) {
126                         synchChannel.stop(NO_WAIT_TIMEOUT);
127                     } else {
128                         synchChannel.stop(timeout);
129                     }
130                 }
131             } catch (IOException e) {
132                 throw e;
133             } catch (Throwable e) {
134                 throw (IOException)new IOException("stop failed: " + e.getMessage()).initCause(e);
135             }
136         }
137     }
138 
139     /***
140      * reads packets from a Socket
141      */
142     public void run() {
143         
144         // Change the thread name.
145         String oldName = Thread.currentThread().getName();        
146         Thread.currentThread().setName( synchChannel.toString() );        
147         try {
148 	        while (running.get()) {
149 	            try {
150 	                Packet packet = synchChannel.read(500);
151 	                if( packet==null )
152 	                    continue;    
153                     
154                     if( packet == EOSPacket.EOS_PACKET ) {
155                         channelListener.onPacket(packet);
156                         return;
157                     }
158                     
159                     if( packet.hasRemaining() ) {
160                         channelListener.onPacket(packet);
161                     }
162                     
163 	            } catch (IOException e) {
164 	                channelListener.onPacketError(e);
165 	            } catch (Throwable e) {
166 	                channelListener.onPacketError((IOException)new IOException("Unexpected Error: "+e).initCause(e));
167 	            }
168 	        }
169         } finally {
170             if( doneLatch!=null )
171                 doneLatch.release();
172             Thread.currentThread().setName(oldName);
173         }
174     }
175 
176     /***
177      * @see org.activeio.AsynchChannel#setAsynchChannelListener(org.activeio.UpPacketListener)
178      */
179     public void setAsynchChannelListener(AsynchChannelListener channelListener) {
180         if (running.get())
181             throw new IllegalStateException("Cannot change the UpPacketListener while the object is running.");
182         this.channelListener = channelListener;
183     }
184 
185     /***
186      * @see org.activeio.Channel#write(org.activeio.channel.Packet)
187      */
188     public void write(org.activeio.Packet packet) throws IOException {
189         synchChannel.write(packet);
190     }
191 
192     /***
193      * @see org.activeio.Channel#flush()
194      */
195     public void flush() throws IOException {
196         synchChannel.flush();
197     }
198 
199     /***
200      * @see org.activeio.Disposable#dispose()
201      */
202     public void dispose() {
203         try {
204             stop(Service.NO_WAIT_TIMEOUT);
205         } catch ( IOException ignore) {
206         }
207         synchChannel.dispose();        
208     }
209 
210     public AsynchChannelListener getAsynchChannelListener() {
211         return channelListener;
212     }
213     
214     public Object narrow(Class target) {
215         if( target.isAssignableFrom(getClass()) ) {
216             return this;
217         }
218         return synchChannel.narrow(target);
219     }    
220     
221     public SynchChannel getSynchChannel() {
222         return synchChannel;
223     }
224 
225     public String toString() {
226         return synchChannel.toString();
227     }
228 }