View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Hiram Chirino
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activeio.net;
19  
20  import java.io.IOException;
21  import java.lang.reflect.InvocationHandler;
22  import java.lang.reflect.InvocationTargetException;
23  import java.lang.reflect.Method;
24  import java.lang.reflect.Proxy;
25  import java.net.URI;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.Map;
29  
30  import org.activeio.AsynchChannel;
31  import org.activeio.AsynchChannelFactory;
32  import org.activeio.AsynchChannelListener;
33  import org.activeio.AsynchChannelServer;
34  import org.activeio.Packet;
35  
36  /***
37   * 
38   * @version $Revision$
39   */
40  final public class VMPipeAsynchChannelFactory implements AsynchChannelFactory {
41      
42      //
43      // We do all this crazy stuff of looking the server map using System
44      // properties
45      // because this class could be loaded multiple times in different
46      // classloaders.
47      //
48      private static final String SERVER_MAP_LOCATION = VMPipeAsynchChannelFactory.class.getName() + ".SERVER_MAP";
49  
50      private static final Map SERVER_MAP;
51      static {
52          Map m = null;
53          m = (Map) System.getProperties().get(SERVER_MAP_LOCATION);
54          if (m == null) {
55              m = Collections.synchronizedMap(new HashMap());
56              System.getProperties().put(SERVER_MAP_LOCATION, m);
57          }
58          SERVER_MAP = m;
59      }
60  
61      private final static ClassLoader MY_CLASSLOADER = Packet.class.getClassLoader();
62      
63      
64      /***
65       * Used to marshal calls to a PipeChannel in a different classloader.
66       */
67      static public class ClassloaderAsynchChannelAdapter implements AsynchChannel {
68  
69          private final ClassLoader cl;
70          private final Object channel;
71          private final Method writeMethod;
72          private final Method setListenerMethod;
73          private final Class listenerClazz;
74          private final Class packetClazz;
75          private final Object listenerProxy;
76          private final Method duplicateMethod;
77          private final Method startMethod;
78          private final Method stopMethod;
79          private final Method disposeMethod;
80  
81          private AsynchChannelListener channelListener;
82  
83          public class ListenerProxyHandler implements InvocationHandler {
84              public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
85                  switch (method.getName().length()) {
86                  case 8: // onPacket
87                      Object packet = duplicateMethod.invoke(args[0], new Object[]{MY_CLASSLOADER});  
88                      channelListener.onPacket((Packet) packet);
89                      break;
90                  case 13: // onPacketError
91                      channelListener.onPacketError((IOException) args[0]);
92                      break;
93                  default:
94                      channelListener.onPacketError(new IOException("Unknown proxy method invocation: "+method.getName()));
95                  }
96                  return null;
97              }
98          }
99  
100         public ClassloaderAsynchChannelAdapter(Object channel) throws SecurityException, NoSuchMethodException,
101                 ClassNotFoundException {
102             this.channel = channel;
103             Class clazz = channel.getClass();
104             cl = clazz.getClassLoader();
105 
106             listenerClazz = cl.loadClass(AsynchChannelListener.class.getName());
107             packetClazz = cl.loadClass(Packet.class.getName());
108             writeMethod = clazz.getMethod("write", new Class[] { packetClazz });
109             startMethod = clazz.getMethod("start", new Class[] { });
110             stopMethod = clazz.getMethod("stop", new Class[] { long.class });
111             disposeMethod = clazz.getMethod("dispose", new Class[] { });
112 
113             setListenerMethod = clazz.getMethod("setAsynchChannelListener", new Class[] { listenerClazz });
114             duplicateMethod = packetClazz.getMethod("duplicate", new Class[] { ClassLoader.class });
115 
116             ListenerProxyHandler handler = new ListenerProxyHandler();
117             listenerProxy = Proxy.newProxyInstance(cl, new Class[] { listenerClazz }, handler);
118         }
119 
120         public void write(Packet packet) throws IOException {
121             callIOExceptionMethod(writeMethod, new Object[] { packet.duplicate(cl) });
122         }
123 
124         public void setAsynchChannelListener(AsynchChannelListener channelListener) {
125             this.channelListener = channelListener;
126             callMethod(setListenerMethod, new Object[] { channelListener == null ? null : listenerProxy });
127         }
128 
129         public AsynchChannelListener getAsynchChannelListener() {
130             return channelListener;
131         }
132 
133         public void dispose() {
134             callMethod(disposeMethod, new Object[] { });
135         }
136 
137         public void start() throws IOException {
138             callIOExceptionMethod(startMethod, new Object[] {});
139         }
140 
141         public void stop(long timeout) throws IOException {
142             callIOExceptionMethod(stopMethod, new Object[] {new Long(timeout)});
143         }
144         
145         private void callMethod(Method method, Object[] args) {
146             try {
147                 method.invoke(channel, args);
148             } catch (InvocationTargetException e) {
149                 if (e.getTargetException() instanceof RuntimeException) {
150                     throw (RuntimeException) e.getTargetException();
151                 }
152                 throw new RuntimeException(e.getTargetException());
153             } catch (Throwable e) {
154                 throw new RuntimeException("Reflexive invocation failed: " + e, e);
155             }            
156         }
157         
158         private void callIOExceptionMethod(Method method, Object[] args) throws IOException {
159             try {
160                 method.invoke(channel, args);
161             } catch (InvocationTargetException e) {
162                 if (e.getTargetException() instanceof IOException) {
163                     throw (IOException) e.getTargetException();
164                 }
165                 if (e.getTargetException() instanceof RuntimeException) {
166                     throw (RuntimeException) e.getTargetException();
167                 }
168                 throw new RuntimeException(e.getTargetException());
169             } catch (Throwable e) {
170                 throw (IOException) new IOException("Reflexive invocation failed: " + e).initCause(e);
171             }            
172         }
173 
174         //
175         // The following methods do not need to delegate since they
176         // are implemented as noops in the PipeChannel
177         //
178         public Object narrow(Class target) {
179             if (target.isAssignableFrom(getClass())) {
180                 return this;
181             }
182             return null;
183         }
184 
185         public void flush() throws IOException {
186         }
187 
188     }
189 
190     private boolean forceRefelection;
191 
192     public AsynchChannel openAsynchChannel(URI location) throws IOException {
193 
194         Object server = lookupServer(location);
195         if (!forceRefelection && server.getClass() == VMPipeAsynchChannelServer.class) {
196             return ((VMPipeAsynchChannelServer) server).connect();
197         }
198 
199         // Asume server is in a different classloader.
200         // Use reflection to connect.
201         try {
202             Method method = server.getClass().getMethod("connect", new Class[] {});
203             Object channel = method.invoke(server, new Object[] {});
204             return new ClassloaderAsynchChannelAdapter(channel);
205         } catch (Throwable e) {
206             throw (IOException) new IOException("Connection could not be established: " + e).initCause(e);
207         }
208     }
209 
210     public AsynchChannelServer bindAsynchChannel(URI bindURI) throws IOException {
211         VMPipeAsynchChannelServer server = new VMPipeAsynchChannelServer(bindURI);
212         bindServer(bindURI, server);
213         return server;
214     }
215 
216     private static Map getServerMap() {
217         return SERVER_MAP;
218     }
219 
220     static public String getServerKeyForURI(URI location) {
221         return location.getHost();
222     }
223 
224     public static void bindServer(URI bindURI, VMPipeAsynchChannelServer server) throws IOException {
225         String key = getServerKeyForURI(bindURI);
226         if (getServerMap().get(key) != null)
227             throw new IOException("Server is allready bound at: " + bindURI);
228         getServerMap().put(key, server);
229     }
230 
231     public static Object lookupServer(URI location) throws IOException {
232         String key = getServerKeyForURI(location);
233         Object server = getServerMap().get(key);
234         if (server == null) {
235             throw new IOException("Connection refused.");
236         }
237         return server;
238     }
239 
240     public static void unbindServer(URI bindURI) {
241         String key = getServerKeyForURI(bindURI);
242         getServerMap().remove(key);
243     }
244 
245     public boolean isForceRefelection() {
246         return forceRefelection;
247     }
248     
249     public void setForceRefelection(boolean forceRefelection) {
250         this.forceRefelection = forceRefelection;
251     }
252     
253 }