View Javadoc

1   /***
2    *
3    * Copyright 2004 Hiram Chirino
4    *
5    *  Licensed under the Apache License, Version 2.0 (the "License");
6    *  you may not use this file except in compliance with the License.
7    *  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   */
17  package org.activeio.filter;
18  
19  import java.io.IOException;
20  import java.io.InterruptedIOException;
21  
22  import org.activeio.FilterSynchChannel;
23  import org.activeio.Packet;
24  import org.activeio.SynchChannel;
25  import org.activeio.SynchChannelServer;
26  
27  import EDU.oswego.cs.dl.util.concurrent.Mutex;
28  import EDU.oswego.cs.dl.util.concurrent.Sync;
29  
30  /***
31   * Used to synchronize concurrent access to a SynchChannel.  
32   * 
33   * Uses two different {@see EDU.oswego.cs.dl.util.concurrent.Sync} objects
34   * for write and read operations.  All other operations such as {@see #stop(long)}
35   * and {@see #stop} just do a normal java synchronization against the SynchornizedSynchChannel
36   * object instance.
37   * 
38   */
39  public class SynchornizedSynchChannel extends FilterSynchChannel {
40  
41      private final Sync readLock;
42      private final Sync writeLock;
43  
44      public SynchornizedSynchChannel(SynchChannel next) {
45          this(next, new Mutex(), new Mutex());
46      }
47      
48      public SynchornizedSynchChannel(SynchChannel next, Sync readLock, Sync writeLock) {
49          super(next);
50          this.readLock = readLock;
51          this.writeLock = writeLock;
52      }
53      
54      public Packet read(long timeout) throws IOException {
55          try {            
56              
57              if( timeout==SynchChannelServer.WAIT_FOREVER_TIMEOUT ) {
58                  readLock.acquire();
59              } else {
60                  long start = System.currentTimeMillis();
61                  if( !readLock.attempt(0) ) {
62                      return null;
63                  }
64                  // Adjust the resulting timeout down to account for time taken to 
65                  // get the readLock.
66                  timeout = Math.max(0, timeout-(System.currentTimeMillis()-start));
67              }
68              
69          } catch (InterruptedException e) {
70              throw new InterruptedIOException(e.getMessage());            
71          }
72          
73          try {
74              return getNext().read(timeout);            
75          } finally {
76              readLock.release();
77          }
78      }
79      
80      public void write(Packet packet) throws IOException {
81          try {                        
82              writeLock.acquire();
83          } catch (InterruptedException e) {
84              throw new InterruptedIOException(e.getMessage());
85          }
86          try {
87              getNext().write(packet);            
88          } finally {
89              writeLock.release();
90          }
91      }
92      
93      public void flush() throws IOException {
94          try {                        
95              writeLock.acquire();
96          } catch (InterruptedException e) {
97              throw new InterruptedIOException(e.getMessage());
98          }
99          try {
100             getNext().flush();            
101         } finally {
102             writeLock.release();
103         }
104     }
105 
106     synchronized public void dispose() {
107         super.dispose();
108     }
109 
110     synchronized public Object narrow(Class target) {
111         return super.narrow(target);
112     }
113 
114     synchronized public void start() throws IOException {
115         super.start();
116     }
117 
118     synchronized public void stop(long timeout) throws IOException {
119         super.stop(timeout);
120     }
121     
122     public Sync getReadLock() {
123         return readLock;
124     }
125     
126     public Sync getWriteLock() {
127         return writeLock;
128     }
129 }