Clover coverage report - ActiveIO - 1.0
Coverage timestamp: Fri Apr 22 2005 14:27:22 PDT
file stats: LOC: 260   Methods: 19
NCLOC: 180   Classes: 4
30 day Evaluation Version distributed via the Maven Jar Repository. Clover is not free. You have 30 days to evaluate it. Please visit http://www.thecortex.net/clover to obtain a licensed version of Clover
 
 Source file Conditionals Statements Methods TOTAL
NIOAsynchChannelSelectorManager.java 76.5% 82.1% 84.2% 81%
coverage coverage
 1   
 /**
 2   
  *
 3   
  * Copyright 2003-2004 The Apache Software Foundation
 4   
  *
 5   
  *  Licensed under the Apache License, Version 2.0 (the "License");
 6   
  *  you may not use this file except in compliance with the License.
 7   
  *  You may obtain a copy of the License at
 8   
  *
 9   
  *     http://www.apache.org/licenses/LICENSE-2.0
 10   
  *
 11   
  *  Unless required by applicable law or agreed to in writing, software
 12   
  *  distributed under the License is distributed on an "AS IS" BASIS,
 13   
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14   
  *  See the License for the specific language governing permissions and
 15   
  *  limitations under the License.
 16   
  */
 17   
 
 18   
 package org.activeio.net;
 19   
 
 20   
 import java.io.IOException;
 21   
 import java.nio.channels.ClosedChannelException;
 22   
 import java.nio.channels.SelectionKey;
 23   
 import java.nio.channels.Selector;
 24   
 import java.nio.channels.SocketChannel;
 25   
 import java.util.Iterator;
 26   
 import java.util.LinkedList;
 27   
 import java.util.Set;
 28   
 
 29   
 import org.activeio.ChannelFactory;
 30   
 
 31   
 import EDU.oswego.cs.dl.util.concurrent.Executor;
 32   
 import EDU.oswego.cs.dl.util.concurrent.DirectExecutor;
 33   
 
 34   
 /**
 35   
  * The SelectorManager will manage one Selector and the thread that checks the
 36   
  * selector.
 37   
  * 
 38   
  * We may need to consider running more than one thread to check the selector if
 39   
  * servicing the selector takes too long.
 40   
  * 
 41   
  * @version $Rev: 46019 $ $Date: 2004-09-14 05:56:06 -0400 (Tue, 14 Sep 2004) $
 42   
  */
 43   
 final public class NIOAsynchChannelSelectorManager {
 44   
 
 45   
     static private Executor selectorExecutor = ChannelFactory.DEFAULT_EXECUTOR;
 46   
     static private Executor channelExecutor = ChannelFactory.DEFAULT_EXECUTOR;
 47   
     
 48   
     static private LinkedList freeManagers = new LinkedList();
 49   
     static private LinkedList fullManagers = new LinkedList();
 50   
     private static final int MAX_CHANNELS_PER_SELECTOR  = 50;
 51   
     
 52   
     static {
 53  4
        String os = System.getProperty("os.name");
 54  4
        if( os.startsWith("Linux") ) {
 55  4
            channelExecutor = new DirectExecutor();
 56   
        }
 57   
     } 
 58   
 
 59   
     public static interface SelectorManagerListener {
 60   
         public void onSelect(SocketChannelAsynchChannelSelection selector);
 61   
     }
 62   
 
 63   
     final public class SocketChannelAsynchChannelSelection {
 64   
         
 65   
         private final SelectionKey key;
 66   
         private final SelectorManagerListener listener;
 67   
         private boolean closed;
 68   
         private int interest;
 69   
 
 70  16
         private SocketChannelAsynchChannelSelection(SocketChannel socketChannel, SelectorManagerListener listener)
 71   
                 throws ClosedChannelException {
 72  16
             this.listener = listener;
 73  16
             this.key = socketChannel.register(selector, 0, this);
 74  16
             incrementUseCounter();
 75   
         }
 76   
 
 77  12
         public void setInterestOps(int ops) {
 78  12
                 if( closed ) 
 79  0
                     return;
 80  12
                 interest = ops;
 81  12
              enable();
 82   
         }
 83   
         
 84  2018
         public void enable() {
 85  2018
             if( closed ) 
 86  2
                 return;
 87  2016
             key.interestOps(interest);
 88  2016
             selector.wakeup();
 89   
         }
 90   
 
 91  2006
         public void disable() {
 92  2006
             if( closed ) 
 93  0
                 return;
 94  2006
             key.interestOps(0);
 95   
         }
 96   
 
 97  20
         public void close() {
 98  20
             if( closed ) 
 99  4
                 return;
 100   
             
 101  16
             key.cancel();
 102  16
             decrementUseCounter();
 103  16
             selector.wakeup();
 104  16
             closed=true;
 105   
         }
 106   
         
 107  2006
         public void onSelect() {
 108  2006
             if( !key.isValid() )
 109  0
                 return;
 110  2006
             listener.onSelect(this);
 111   
         }
 112   
 
 113  0
         public boolean isWritable() {
 114  0
             return key.isWritable();
 115   
         }
 116   
 
 117  2006
         public boolean isReadable() {
 118  2006
             return key.isReadable();
 119   
         }
 120   
     }
 121   
 
 122  16
     public synchronized static SocketChannelAsynchChannelSelection register(
 123   
             SocketChannel socketChannel, SelectorManagerListener listener)
 124   
             throws IOException {
 125   
 
 126  16
         NIOAsynchChannelSelectorManager manager = null;
 127  16
         synchronized (freeManagers) {
 128  16
             if (freeManagers.size() > 0)
 129  12
                 manager = (NIOAsynchChannelSelectorManager) freeManagers.getFirst();
 130  16
             if (manager == null) {
 131  4
                 manager = new NIOAsynchChannelSelectorManager();
 132  4
                 freeManagers.addFirst(manager);
 133   
             }
 134   
 
 135   
             // That manager may have filled up.
 136  16
             SocketChannelAsynchChannelSelection selection = manager.new SocketChannelAsynchChannelSelection(
 137   
                     socketChannel, listener);
 138  16
             if (manager.useCounter >= MAX_CHANNELS_PER_SELECTOR) {
 139  0
                 freeManagers.removeFirst();
 140  0
                 fullManagers.addLast(manager);
 141   
             }
 142  16
             return selection;
 143   
         }
 144   
     }
 145   
 
 146  0
     public synchronized static void setSelectorExecutor(Executor executor) {
 147  0
         NIOAsynchChannelSelectorManager.selectorExecutor = executor;
 148   
     }
 149   
     
 150  0
     public synchronized static void setChannelExecutor(Executor executor) {
 151  0
         NIOAsynchChannelSelectorManager.channelExecutor = executor;
 152   
     }
 153   
 
 154   
     private class SelectorWorker implements Runnable {
 155   
                 
 156  8
         public void run() {            
 157   
             
 158  8
             String origName = Thread.currentThread().getName();
 159  8
             try {
 160  8
                Thread.currentThread().setName("Selector Worker: "+getId());
 161  8
                while ( isRunning() ) {
 162   
 
 163  4089
                    int count = selector.select(10);
 164  4089
                    if (count == 0)
 165  2083
                        continue;                
 166  2006
                     if( !isRunning() )
 167  0
                         return;
 168   
 
 169   
                     // Get a java.util.Set containing the SelectionKey objects
 170   
                     // for all channels that are ready for I/O.
 171  2006
                     Set keys = selector.selectedKeys();
 172   
     
 173  2006
                     for (Iterator i = keys.iterator(); i.hasNext();) {                        
 174  2006
                         final SelectionKey key = (SelectionKey) i.next();
 175  2006
                         i.remove();
 176   
 
 177  2006
                         if( !key.isValid() ) 
 178  0
                             continue;
 179   
                         
 180  2006
                         final SocketChannelAsynchChannelSelection s = (SocketChannelAsynchChannelSelection) key.attachment();
 181  2006
                         s.disable();
 182   
                         
 183   
                         // Kick off another thread to find newly selected keys while we process the 
 184   
                         // currently selected keys                
 185  2006
                         channelExecutor.execute(new Runnable() {
 186  2006
                             public void run() {
 187  2006
                                 try {
 188  2006
                                     s.onSelect();
 189  2006
                                     s.enable();
 190   
                                 } catch ( Throwable e ) {
 191  0
                                     System.err.println("ActiveIO unexpected error: ");
 192  0
                                     e.printStackTrace(System.err);
 193   
                                 }
 194   
                             }
 195   
                         });
 196   
                     }
 197   
                     
 198   
                }
 199   
             } catch (Throwable e) {
 200  0
                 System.err.println("Unexpected exception: " + e);
 201  0
                 e.printStackTrace();
 202   
             } finally {
 203  8
                 Thread.currentThread().setName(origName);
 204   
             }
 205   
         }
 206   
     }
 207   
 
 208   
     /**
 209   
      * The selector used to wait for non-blocking events.
 210   
      */
 211   
     private Selector selector;
 212   
 
 213   
     /**
 214   
      * How many SelectionKeys does the selector have active.
 215   
      */
 216   
     private int useCounter;
 217   
     private int id = getNextId();
 218   
     private static int nextId;
 219   
 
 220  4
     private NIOAsynchChannelSelectorManager() throws IOException {
 221  4
         selector = Selector.open();
 222   
     }
 223   
     
 224  4
     synchronized private static int getNextId() {
 225  4
         return nextId++;
 226   
     }
 227   
 
 228  8
     private int getId() {
 229  8
         return id ;
 230   
     }
 231   
 
 232  16
     synchronized private void incrementUseCounter() {
 233  16
         useCounter++;
 234  16
         if (useCounter == 1) {
 235  8
             try {
 236  8
                 selectorExecutor.execute(new SelectorWorker());
 237   
             } catch (InterruptedException e) {
 238  0
                 Thread.currentThread().interrupt();
 239   
             }
 240   
         }
 241   
     }
 242   
 
 243  16
     synchronized private void decrementUseCounter() {
 244  16
         useCounter--;
 245  16
          synchronized(freeManagers) {                     
 246  16
                if( useCounter == 0 ) {
 247  8
                     freeManagers.remove(this);
 248   
                 }                  
 249  8
                else if( useCounter < MAX_CHANNELS_PER_SELECTOR ) {
 250  8
                     fullManagers.remove(this);
 251  8
                     freeManagers.addLast(this);
 252   
                 }                   
 253   
         }
 254   
     }
 255   
 
 256  6103
     synchronized private boolean isRunning() {
 257  6103
         return useCounter > 0;
 258   
     }
 259   
 }
 260