Clover coverage report - ActiveIO - 1.0
Coverage timestamp: Fri Apr 22 2005 14:27:22 PDT
file stats: LOC: 203   Methods: 14
NCLOC: 137   Classes: 1
30 day Evaluation Version distributed via the Maven Jar Repository. Clover is not free. You have 30 days to evaluate it. Please visit http://www.thecortex.net/clover to obtain a licensed version of Clover
 
 Source file Conditionals Statements Methods TOTAL
SynchToAsynchChannelServerAdapter.java 47.1% 65.1% 85.7% 62.2%
coverage coverage
 1   
 /**
 2   
  * 
 3   
  * Copyright 2004 Protique Ltd
 4   
  * 
 5   
  * Licensed under the Apache License, Version 2.0 (the "License"); 
 6   
  * you may not use this file except in compliance with the License. 
 7   
  * You may obtain a copy of the License at 
 8   
  * 
 9   
  * http://www.apache.org/licenses/LICENSE-2.0
 10   
  * 
 11   
  * Unless required by applicable law or agreed to in writing, software
 12   
  * distributed under the License is distributed on an "AS IS" BASIS, 
 13   
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 14   
  * See the License for the specific language governing permissions and 
 15   
  * limitations under the License. 
 16   
  * 
 17   
  **/
 18   
 
 19   
 package org.activeio.adapter;
 20   
 
 21   
 import java.io.IOException;
 22   
 import java.io.InterruptedIOException;
 23   
 import java.net.URI;
 24   
 
 25   
 import org.activeio.AcceptListener;
 26   
 import org.activeio.AsynchChannelServer;
 27   
 import org.activeio.Channel;
 28   
 import org.activeio.ChannelFactory;
 29   
 import org.activeio.ChannelServer;
 30   
 import org.activeio.Disposable;
 31   
 import org.activeio.Service;
 32   
 import org.activeio.SynchChannelServer;
 33   
 
 34   
 import EDU.oswego.cs.dl.util.concurrent.Executor;
 35   
 import EDU.oswego.cs.dl.util.concurrent.Latch;
 36   
 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
 37   
 
 38   
 /**
 39   
  * Adapts a {@see org.activeio,SynchChannelServer} so that it provides an 
 40   
  * {@see org.activeio.AsynchChannelServer} interface.  When this channel
 41   
  * is started, a background thread is used to poll the (@see org.activeio.SynchChannelServer}
 42   
  * for accepted channel connections which are then delivered to the {@see org.activeio.AcceptConsumer}.
 43   
  * 
 44   
  * @version $Revision$
 45   
  */
 46   
 final public class SynchToAsynchChannelServerAdapter implements AsynchChannelServer, Runnable {
 47   
 
 48   
     private final SynchChannelServer synchChannelServer;
 49   
     private final SynchronizedBoolean running = new SynchronizedBoolean(false);
 50   
     private final Executor executor;
 51   
     private AcceptListener acceptListener;
 52   
     private Latch doneLatch;
 53   
     
 54   
     
 55  8
     static public AsynchChannelServer adapt(ChannelServer channel) {
 56  8
         return adapt(channel, ChannelFactory.DEFAULT_EXECUTOR);
 57   
     }
 58   
 
 59  8
     static public AsynchChannelServer adapt(ChannelServer channel, Executor executor) {
 60   
 
 61   
         // It might not need adapting
 62  8
         if( channel instanceof AsynchChannelServer ) {
 63  0
             return (AsynchChannelServer) channel;
 64   
         }
 65   
 
 66   
         // Can we just just undo the adaptor
 67  8
         if( channel.getClass() == SynchToAsynchChannelAdapter.class ) {
 68  0
             return ((AsynchToSynchChannelServerAdapter)channel).getAsynchChannelServer();
 69   
         }
 70   
         
 71  8
         return new SynchToAsynchChannelServerAdapter((SynchChannelServer)channel, executor);        
 72   
     }
 73   
     
 74  2
     public SynchToAsynchChannelServerAdapter(SynchChannelServer synchServer) {
 75  2
         this(synchServer, ChannelFactory.DEFAULT_EXECUTOR);
 76   
     }
 77   
     
 78  30
     public SynchToAsynchChannelServerAdapter(SynchChannelServer synchServer, Executor executor) {
 79  30
         this.synchChannelServer = synchServer;        
 80  30
         this.executor=executor;
 81   
     }
 82   
     
 83  30
     synchronized public void start() throws IOException {        
 84  30
         if (running.commit(false, true)) {
 85   
             
 86  30
             if( acceptListener == null )
 87  0
                 throw new IllegalStateException("AcceptListener must be set before object can be started.");
 88   
 
 89  30
             synchChannelServer.start();
 90   
             
 91  30
             try {
 92  30
                 doneLatch = new Latch();
 93  30
                 executor.execute(this);
 94   
             } catch (InterruptedException e) {
 95  0
                 throw new InterruptedIOException(e.getMessage());
 96   
             }
 97   
         }
 98   
     }
 99   
 
 100  30
     synchronized public void stop(long timeout) throws IOException {
 101  30
         if (running.commit(true, false)) {
 102  30
             try {
 103   
                 
 104  30
                 if( timeout == NO_WAIT_TIMEOUT ) {
 105  30
                     synchChannelServer.stop(NO_WAIT_TIMEOUT);
 106  0
                 } else if( timeout == WAIT_FOREVER_TIMEOUT ) {
 107  0
                     doneLatch.acquire();
 108  0
                     synchChannelServer.stop(WAIT_FOREVER_TIMEOUT);
 109   
                 } else {
 110   
                     
 111  0
                     long start = System.currentTimeMillis();
 112  0
                     if( doneLatch.attempt(timeout) ) {
 113  0
                         timeout -= (System.currentTimeMillis() - start);
 114   
                     } else {
 115  0
                         timeout=0;
 116   
                     }
 117   
                     
 118  0
                     if( timeout <= 0 ) {
 119  0
                         synchChannelServer.stop(NO_WAIT_TIMEOUT);
 120   
                     } else {
 121  0
                         synchChannelServer.stop(timeout);
 122   
                     }
 123   
                 }
 124   
                 
 125   
             } catch (IOException e) {
 126  0
                 throw e;
 127   
             } catch (Throwable e) {
 128  0
                 throw (IOException)new IOException("stop failed: " + e.getMessage()).initCause(e);
 129   
             }
 130   
         }
 131   
     }
 132   
 
 133  30
     public void run() {
 134   
         // Change the thread name.
 135  30
         String oldName = Thread.currentThread().getName();        
 136  30
         Thread.currentThread().setName( synchChannelServer.toString() );
 137  30
         try {
 138  30
             while (running.get()) {
 139  130
                 try {
 140  130
                     Channel channel = synchChannelServer.accept(500);
 141  101
                     if( channel == null )
 142  65
                         continue;                
 143  36
                     acceptListener.onAccept(channel);
 144   
                 } catch (IOException e) {
 145  22
                     if( running.get() )
 146  0
                         acceptListener.onAcceptError(e);        
 147   
                 } catch (Throwable e) {                
 148  2
                     if( running.get() )
 149  0
                         acceptListener.onAcceptError((IOException)new IOException("Unexpected Error: "+e).initCause(e));
 150   
                 }
 151   
             }
 152   
         } finally {
 153  25
             if( doneLatch!=null )
 154  25
                 doneLatch.release();
 155  25
             Thread.currentThread().setName(oldName);            
 156   
         }
 157   
     }
 158   
 
 159   
     /**
 160   
      * @see org.activeio.AsynchChannelServer#setAcceptListener(org.activeio.AcceptListener)
 161   
      */
 162  30
     public void setAcceptListener(AcceptListener acceptListener) {
 163  30
         if(running.get()) 
 164  0
             throw new IllegalStateException("Cannot change the AcceptListener while the object is running.");        
 165  30
         this.acceptListener = acceptListener;
 166   
     }
 167   
 
 168   
     /**
 169   
      * @see org.activeio.Disposable#dispose()
 170   
      */
 171  30
     public void dispose() {
 172  30
         try {
 173  30
             stop(Service.NO_WAIT_TIMEOUT);
 174   
         } catch ( IOException ignore) {
 175   
         }
 176  30
         if( synchChannelServer instanceof Disposable ) {
 177  30
             ((Disposable)synchChannelServer).dispose();
 178   
         }
 179   
     }
 180   
 
 181  6
     public URI getBindURI() {
 182  6
         return synchChannelServer.getBindURI();
 183   
     }
 184   
 
 185  64
     public URI getConnectURI() {
 186  64
         return synchChannelServer.getConnectURI();
 187   
     }
 188   
 
 189  0
     public SynchChannelServer getSynchChannelServer() {
 190  0
         return synchChannelServer;
 191   
     }
 192   
     
 193  4
     public Object narrow(Class target) {
 194  4
         if( target.isAssignableFrom(getClass()) ) {
 195  0
             return this;
 196   
         }
 197  4
         return synchChannelServer.narrow(target);
 198   
     }    
 199   
     
 200  0
     public String toString() {
 201  0
         return synchChannelServer.toString();
 202   
     }
 203   
 }