1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activeio.oneport;
19
20 import java.io.IOException;
21 import java.net.URI;
22 import java.util.Iterator;
23
24 import org.activeio.AcceptListener;
25 import org.activeio.AsynchChannel;
26 import org.activeio.AsynchChannelListener;
27 import org.activeio.AsynchChannelServer;
28 import org.activeio.Channel;
29 import org.activeio.FilterAsynchChannel;
30 import org.activeio.FilterAsynchChannelServer;
31 import org.activeio.Packet;
32 import org.activeio.SynchChannel;
33 import org.activeio.adapter.AsynchToSynchChannelAdapter;
34 import org.activeio.adapter.SynchToAsynchChannelAdapter;
35 import org.activeio.filter.PushbackSynchChannel;
36 import org.activeio.packet.AppendedPacket;
37
38 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
39
40 /***
41 * Allows multiple protocols share a single ChannelServer. All protocols sharing the server
42 * must have a distinct magic number at the beging of the client's request.
43 *
44 * TODO: handle the case where a client opens a connection but sends no data down the stream. We need
45 * to timeout that client.
46 *
47 * @version $Revision$
48 */
49 final public class OnePortAsynchChannelServer extends FilterAsynchChannelServer {
50
51 /***
52 * The OnePortAsynchChannelServer listens for incoming connection
53 * from a normal AsynchChannelServer. This s the listner used
54 * to receive the accepted channels.
55 */
56 final private class OnePortAcceptListener implements AcceptListener {
57
58 public void onAccept(Channel channel) {
59 try {
60 AsynchChannel asynchChannel = SynchToAsynchChannelAdapter.adapt(channel);
61 ProtocolInspectingAsynchChannel inspector = new ProtocolInspectingAsynchChannel(asynchChannel);
62 inspector.start();
63 } catch (IOException e) {
64 onAcceptError(e);
65 }
66 }
67
68 public void onAcceptError(IOException error) {
69 dispose();
70 }
71 }
72
73 /***
74 * This channel filter sniffs the first few bytes of the byte stream
75 * to see if a ProtocolRecognizer recognizes the protocol. If it does not
76 * it just closes the channel, otherwise the associated SubPortAsynchChannelServer
77 * is notified that it accepted a channel.
78 *
79 */
80 final private class ProtocolInspectingAsynchChannel extends FilterAsynchChannel {
81 private Packet buffer;
82
83 public ProtocolInspectingAsynchChannel(AsynchChannel next) throws IOException {
84 super(next);
85 setAsynchChannelListener(new AsynchChannelListener() {
86 public void onPacket(Packet packet) {
87 if (buffer == null) {
88 buffer = packet;
89 } else {
90 buffer = AppendedPacket.join(buffer, packet);
91 }
92 findMagicNumber();
93 }
94
95 public void onPacketError(IOException error) {
96 dispose();
97 }
98 });
99 }
100
101 private void findMagicNumber() {
102 for (Iterator iter = recognizerMap.keySet().iterator(); iter.hasNext();) {
103 ProtocolRecognizer recognizer = (ProtocolRecognizer) iter.next();
104 if (recognizer.recognizes(buffer.duplicate())) {
105
106 if( UnknownRecognizer.UNKNOWN_RECOGNIZER == recognizer ) {
107
108 dispose();
109 }
110
111 SubPortAsynchChannelServer onePort = (SubPortAsynchChannelServer) recognizerMap.get(recognizer);
112 if( onePort == null ) {
113
114 dispose();
115 }
116
117
118
119
120
121
122
123 try {
124 stop(NO_WAIT_TIMEOUT);
125 setAsynchChannelListener(null);
126 } catch (IOException e) {
127 getAsynchChannelListener().onPacketError(e);
128 }
129
130 Channel channel = getNext();
131 channel = AsynchToSynchChannelAdapter.adapt(channel);
132 channel = new PushbackSynchChannel((SynchChannel) channel, buffer);
133 channel = SynchToAsynchChannelAdapter.adapt(channel);
134
135 onePort.onAccept(channel);
136 break;
137 }
138 }
139 }
140 }
141
142 /***
143 * Clients bind against the OnePortAsynchChannelServer and get
144 * SubPortAsynchChannelServer which can be used to accept connections.
145 */
146 final private class SubPortAsynchChannelServer implements AsynchChannelServer {
147
148 private final ProtocolRecognizer recognizer;
149 private AcceptListener acceptListener;
150 private boolean started;
151
152 /***
153 * @param recognizer
154 */
155 public SubPortAsynchChannelServer(ProtocolRecognizer recognizer) {
156 this.recognizer = recognizer;
157 }
158
159 public void setAcceptListener(AcceptListener acceptListener) {
160 this.acceptListener = acceptListener;
161 }
162
163 public URI getBindURI() {
164 return next.getBindURI();
165 }
166
167 public URI getConnectURI() {
168 return next.getConnectURI();
169 }
170
171 public void dispose() {
172 started = false;
173 recognizerMap.remove(recognizer);
174 }
175
176 public void start() throws IOException {
177 started = true;
178 }
179 public void stop(long timeout) throws IOException {
180 started = false;
181 }
182
183 void onAccept(Channel channel) {
184 if( started && acceptListener!=null ) {
185 acceptListener.onAccept(channel);
186 } else {
187
188 channel.dispose();
189 }
190 }
191
192 public Object narrow(Class target) {
193 if( target.isAssignableFrom(getClass()) ) {
194 return this;
195 }
196 return OnePortAsynchChannelServer.this.narrow(target);
197 }
198
199 }
200
201
202 private final ConcurrentHashMap recognizerMap = new ConcurrentHashMap();
203
204 public OnePortAsynchChannelServer(AsynchChannelServer server) throws IOException {
205 super(server);
206 super.setAcceptListener(new OnePortAcceptListener());
207 }
208
209 public void setAcceptListener(AcceptListener acceptListener) {
210 throw new IllegalAccessError("Not supported");
211 }
212
213 public AsynchChannelServer bindAsynchChannel(ProtocolRecognizer recognizer) throws IOException {
214
215 if( recognizerMap.contains(recognizer) )
216 throw new IOException("That recognizer is allredy bound.");
217
218 SubPortAsynchChannelServer server = new SubPortAsynchChannelServer(recognizer);
219 Object old = recognizerMap.put(recognizer, server);
220 return server;
221 }
222
223
224 }