View Javadoc

1   /***
2    *
3    * Copyright 2004 Hiram Chirino
4    *
5    *  Licensed under the Apache License, Version 2.0 (the "License");
6    *  you may not use this file except in compliance with the License.
7    *  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   */
17  package org.activeio.filter;
18  
19  import java.io.IOException;
20  import java.io.InterruptedIOException;
21  
22  import org.activeio.AsynchChannel;
23  import org.activeio.FilterAsynchChannel;
24  import org.activeio.Packet;
25  
26  import EDU.oswego.cs.dl.util.concurrent.Mutex;
27  import EDU.oswego.cs.dl.util.concurrent.Sync;
28  
29  /***
30   * Used to synchronize concurrent access to an ASynchChannel.  
31   * 
32   * Uses a {@see EDU.oswego.cs.dl.util.concurrent.Sync} object
33   * for write operations.  All other operations such as {@see #stop(long)}
34   * and {@see #stop} just do a normal java synchronization against the SynchornizedSynchChannel
35   * object instance.  It is assumed that the Async message delivery is not 
36   * concurrent and therefore does not require synchronization.
37   * 
38   */
39  public class SynchornizedAsynchChannel extends FilterAsynchChannel {
40  
41      private final Sync writeLock;
42  
43      public SynchornizedAsynchChannel(AsynchChannel next) {
44          this(next, new Mutex());
45      }
46      
47      public SynchornizedAsynchChannel(AsynchChannel next, Sync writeLock) {
48          super(next);
49          this.writeLock = writeLock;
50      }    
51      
52      public void write(Packet packet) throws IOException {
53          try {                        
54              writeLock.acquire();
55          } catch (InterruptedException e) {
56              throw new InterruptedIOException(e.getMessage());
57          }
58          try {
59              getNext().write(packet);            
60          } finally {
61              writeLock.release();
62          }
63      }
64      
65      public void flush() throws IOException {
66          try {                        
67              writeLock.acquire();
68          } catch (InterruptedException e) {
69              throw new InterruptedIOException(e.getMessage());
70          }
71          try {
72              getNext().flush();            
73          } finally {
74              writeLock.release();
75          }
76      }
77  
78      synchronized public void dispose() {
79          super.dispose();
80      }
81  
82      synchronized public Object narrow(Class target) {
83          return super.narrow(target);
84      }
85  
86      synchronized public void start() throws IOException {
87          super.start();
88      }
89  
90      synchronized public void stop(long timeout) throws IOException {
91          super.stop(timeout);
92      }
93      
94      public Sync getWriteLock() {
95          return writeLock;
96      }
97  }