|
|||||||||||||||||||
30 day Evaluation Version distributed via the Maven Jar Repository. Clover is not free. You have 30 days to evaluate it. Please visit http://www.thecortex.net/clover to obtain a licensed version of Clover | |||||||||||||||||||
Source file | Conditionals | Statements | Methods | TOTAL | |||||||||||||||
NIOAsynchChannelSelectorManager.java | 76.5% | 82.1% | 84.2% | 81% |
|
1 |
/**
|
|
2 |
*
|
|
3 |
* Copyright 2003-2004 The Apache Software Foundation
|
|
4 |
*
|
|
5 |
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
6 |
* you may not use this file except in compliance with the License.
|
|
7 |
* You may obtain a copy of the License at
|
|
8 |
*
|
|
9 |
* http://www.apache.org/licenses/LICENSE-2.0
|
|
10 |
*
|
|
11 |
* Unless required by applicable law or agreed to in writing, software
|
|
12 |
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
13 |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
14 |
* See the License for the specific language governing permissions and
|
|
15 |
* limitations under the License.
|
|
16 |
*/
|
|
17 |
|
|
18 |
package org.activeio.net;
|
|
19 |
|
|
20 |
import java.io.IOException;
|
|
21 |
import java.nio.channels.ClosedChannelException;
|
|
22 |
import java.nio.channels.SelectionKey;
|
|
23 |
import java.nio.channels.Selector;
|
|
24 |
import java.nio.channels.SocketChannel;
|
|
25 |
import java.util.Iterator;
|
|
26 |
import java.util.LinkedList;
|
|
27 |
import java.util.Set;
|
|
28 |
|
|
29 |
import org.activeio.ChannelFactory;
|
|
30 |
|
|
31 |
import EDU.oswego.cs.dl.util.concurrent.Executor;
|
|
32 |
import EDU.oswego.cs.dl.util.concurrent.DirectExecutor;
|
|
33 |
|
|
34 |
/**
|
|
35 |
* The SelectorManager will manage one Selector and the thread that checks the
|
|
36 |
* selector.
|
|
37 |
*
|
|
38 |
* We may need to consider running more than one thread to check the selector if
|
|
39 |
* servicing the selector takes too long.
|
|
40 |
*
|
|
41 |
* @version $Rev: 46019 $ $Date: 2004-09-14 05:56:06 -0400 (Tue, 14 Sep 2004) $
|
|
42 |
*/
|
|
43 |
final public class NIOAsynchChannelSelectorManager { |
|
44 |
|
|
45 |
static private Executor selectorExecutor = ChannelFactory.DEFAULT_EXECUTOR; |
|
46 |
static private Executor channelExecutor = ChannelFactory.DEFAULT_EXECUTOR; |
|
47 |
|
|
48 |
static private LinkedList freeManagers = new LinkedList(); |
|
49 |
static private LinkedList fullManagers = new LinkedList(); |
|
50 |
private static final int MAX_CHANNELS_PER_SELECTOR = 50; |
|
51 |
|
|
52 |
static {
|
|
53 | 4 |
String os = System.getProperty("os.name");
|
54 | 4 |
if( os.startsWith("Linux") ) { |
55 | 4 |
channelExecutor = new DirectExecutor();
|
56 |
} |
|
57 |
} |
|
58 |
|
|
59 |
public static interface SelectorManagerListener { |
|
60 |
public void onSelect(SocketChannelAsynchChannelSelection selector); |
|
61 |
} |
|
62 |
|
|
63 |
final public class SocketChannelAsynchChannelSelection { |
|
64 |
|
|
65 |
private final SelectionKey key;
|
|
66 |
private final SelectorManagerListener listener;
|
|
67 |
private boolean closed; |
|
68 |
private int interest; |
|
69 |
|
|
70 | 16 |
private SocketChannelAsynchChannelSelection(SocketChannel socketChannel, SelectorManagerListener listener)
|
71 |
throws ClosedChannelException {
|
|
72 | 16 |
this.listener = listener;
|
73 | 16 |
this.key = socketChannel.register(selector, 0, this); |
74 | 16 |
incrementUseCounter(); |
75 |
} |
|
76 |
|
|
77 | 12 |
public void setInterestOps(int ops) { |
78 | 12 |
if( closed )
|
79 | 0 |
return;
|
80 | 12 |
interest = ops; |
81 | 12 |
enable(); |
82 |
} |
|
83 |
|
|
84 | 2018 |
public void enable() { |
85 | 2018 |
if( closed )
|
86 | 2 |
return;
|
87 | 2016 |
key.interestOps(interest); |
88 | 2016 |
selector.wakeup(); |
89 |
} |
|
90 |
|
|
91 | 2006 |
public void disable() { |
92 | 2006 |
if( closed )
|
93 | 0 |
return;
|
94 | 2006 |
key.interestOps(0); |
95 |
} |
|
96 |
|
|
97 | 20 |
public void close() { |
98 | 20 |
if( closed )
|
99 | 4 |
return;
|
100 |
|
|
101 | 16 |
key.cancel(); |
102 | 16 |
decrementUseCounter(); |
103 | 16 |
selector.wakeup(); |
104 | 16 |
closed=true;
|
105 |
} |
|
106 |
|
|
107 | 2006 |
public void onSelect() { |
108 | 2006 |
if( !key.isValid() )
|
109 | 0 |
return;
|
110 | 2006 |
listener.onSelect(this);
|
111 |
} |
|
112 |
|
|
113 | 0 |
public boolean isWritable() { |
114 | 0 |
return key.isWritable();
|
115 |
} |
|
116 |
|
|
117 | 2006 |
public boolean isReadable() { |
118 | 2006 |
return key.isReadable();
|
119 |
} |
|
120 |
} |
|
121 |
|
|
122 | 16 |
public synchronized static SocketChannelAsynchChannelSelection register( |
123 |
SocketChannel socketChannel, SelectorManagerListener listener) |
|
124 |
throws IOException {
|
|
125 |
|
|
126 | 16 |
NIOAsynchChannelSelectorManager manager = null;
|
127 | 16 |
synchronized (freeManagers) {
|
128 | 16 |
if (freeManagers.size() > 0)
|
129 | 12 |
manager = (NIOAsynchChannelSelectorManager) freeManagers.getFirst(); |
130 | 16 |
if (manager == null) { |
131 | 4 |
manager = new NIOAsynchChannelSelectorManager();
|
132 | 4 |
freeManagers.addFirst(manager); |
133 |
} |
|
134 |
|
|
135 |
// That manager may have filled up.
|
|
136 | 16 |
SocketChannelAsynchChannelSelection selection = manager.new SocketChannelAsynchChannelSelection(
|
137 |
socketChannel, listener); |
|
138 | 16 |
if (manager.useCounter >= MAX_CHANNELS_PER_SELECTOR) {
|
139 | 0 |
freeManagers.removeFirst(); |
140 | 0 |
fullManagers.addLast(manager); |
141 |
} |
|
142 | 16 |
return selection;
|
143 |
} |
|
144 |
} |
|
145 |
|
|
146 | 0 |
public synchronized static void setSelectorExecutor(Executor executor) { |
147 | 0 |
NIOAsynchChannelSelectorManager.selectorExecutor = executor; |
148 |
} |
|
149 |
|
|
150 | 0 |
public synchronized static void setChannelExecutor(Executor executor) { |
151 | 0 |
NIOAsynchChannelSelectorManager.channelExecutor = executor; |
152 |
} |
|
153 |
|
|
154 |
private class SelectorWorker implements Runnable { |
|
155 |
|
|
156 | 8 |
public void run() { |
157 |
|
|
158 | 8 |
String origName = Thread.currentThread().getName(); |
159 | 8 |
try {
|
160 | 8 |
Thread.currentThread().setName("Selector Worker: "+getId());
|
161 | 8 |
while ( isRunning() ) {
|
162 |
|
|
163 | 4089 |
int count = selector.select(10);
|
164 | 4089 |
if (count == 0)
|
165 | 2083 |
continue;
|
166 | 2006 |
if( !isRunning() )
|
167 | 0 |
return;
|
168 |
|
|
169 |
// Get a java.util.Set containing the SelectionKey objects
|
|
170 |
// for all channels that are ready for I/O.
|
|
171 | 2006 |
Set keys = selector.selectedKeys(); |
172 |
|
|
173 | 2006 |
for (Iterator i = keys.iterator(); i.hasNext();) {
|
174 | 2006 |
final SelectionKey key = (SelectionKey) i.next(); |
175 | 2006 |
i.remove(); |
176 |
|
|
177 | 2006 |
if( !key.isValid() )
|
178 | 0 |
continue;
|
179 |
|
|
180 | 2006 |
final SocketChannelAsynchChannelSelection s = (SocketChannelAsynchChannelSelection) key.attachment(); |
181 | 2006 |
s.disable(); |
182 |
|
|
183 |
// Kick off another thread to find newly selected keys while we process the
|
|
184 |
// currently selected keys
|
|
185 | 2006 |
channelExecutor.execute(new Runnable() {
|
186 | 2006 |
public void run() { |
187 | 2006 |
try {
|
188 | 2006 |
s.onSelect(); |
189 | 2006 |
s.enable(); |
190 |
} catch ( Throwable e ) {
|
|
191 | 0 |
System.err.println("ActiveIO unexpected error: ");
|
192 | 0 |
e.printStackTrace(System.err); |
193 |
} |
|
194 |
} |
|
195 |
}); |
|
196 |
} |
|
197 |
|
|
198 |
} |
|
199 |
} catch (Throwable e) {
|
|
200 | 0 |
System.err.println("Unexpected exception: " + e);
|
201 | 0 |
e.printStackTrace(); |
202 |
} finally {
|
|
203 | 8 |
Thread.currentThread().setName(origName); |
204 |
} |
|
205 |
} |
|
206 |
} |
|
207 |
|
|
208 |
/**
|
|
209 |
* The selector used to wait for non-blocking events.
|
|
210 |
*/
|
|
211 |
private Selector selector;
|
|
212 |
|
|
213 |
/**
|
|
214 |
* How many SelectionKeys does the selector have active.
|
|
215 |
*/
|
|
216 |
private int useCounter; |
|
217 |
private int id = getNextId(); |
|
218 |
private static int nextId; |
|
219 |
|
|
220 | 4 |
private NIOAsynchChannelSelectorManager() throws IOException { |
221 | 4 |
selector = Selector.open(); |
222 |
} |
|
223 |
|
|
224 | 4 |
synchronized private static int getNextId() { |
225 | 4 |
return nextId++;
|
226 |
} |
|
227 |
|
|
228 | 8 |
private int getId() { |
229 | 8 |
return id ;
|
230 |
} |
|
231 |
|
|
232 | 16 |
synchronized private void incrementUseCounter() { |
233 | 16 |
useCounter++; |
234 | 16 |
if (useCounter == 1) {
|
235 | 8 |
try {
|
236 | 8 |
selectorExecutor.execute(new SelectorWorker());
|
237 |
} catch (InterruptedException e) {
|
|
238 | 0 |
Thread.currentThread().interrupt(); |
239 |
} |
|
240 |
} |
|
241 |
} |
|
242 |
|
|
243 | 16 |
synchronized private void decrementUseCounter() { |
244 | 16 |
useCounter--; |
245 | 16 |
synchronized(freeManagers) {
|
246 | 16 |
if( useCounter == 0 ) {
|
247 | 8 |
freeManagers.remove(this);
|
248 |
} |
|
249 | 8 |
else if( useCounter < MAX_CHANNELS_PER_SELECTOR ) { |
250 | 8 |
fullManagers.remove(this);
|
251 | 8 |
freeManagers.addLast(this);
|
252 |
} |
|
253 |
} |
|
254 |
} |
|
255 |
|
|
256 | 6103 |
synchronized private boolean isRunning() { |
257 | 6103 |
return useCounter > 0;
|
258 |
} |
|
259 |
} |
|
260 |
|
|