1   /***
2    *
3    * Copyright 2004 Hiram Chirino
4    *
5    *  Licensed under the Apache License, Version 2.0 (the "License");
6    *  you may not use this file except in compliance with the License.
7    *  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   */
17  package org.activeio.net;
18  
19  import java.io.IOException;
20  import java.net.URI;
21  import java.net.URISyntaxException;
22  
23  import junit.framework.TestCase;
24  
25  import org.activeio.Channel;
26  import org.activeio.ChannelServer;
27  import org.activeio.Packet;
28  import org.activeio.SynchChannel;
29  import org.activeio.SynchChannelServer;
30  import org.activeio.adapter.AsynchToSynchChannelAdapter;
31  import org.activeio.adapter.AsynchToSynchChannelServerAdapter;
32  import org.activeio.packet.ByteArrayPacket;
33  import org.activeio.packet.EOSPacket;
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  
37  import EDU.oswego.cs.dl.util.concurrent.Executor;
38  import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
39  
40  /***
41   * Used to test the {@see org.activeio.net.TcpSynchChannel}
42   *  
43   * @version $Revision$
44   */
45  abstract public class SynchChannelTestSupport extends TestCase {
46      
47      Log log = LogFactory.getLog(SynchChannelTestSupport.class);
48      private SynchChannelServer server;
49      private SynchChannel clientChannel;
50      private SynchChannel serverChannel;
51      Executor sendExecutor = new QueuedExecutor();
52      
53      public void testSmallSendReceive() throws IOException, URISyntaxException, InterruptedException {
54          if( isDisabled() ) {
55              log.info("test disabled: "+getName());
56              return;
57          }
58          Packet outboundPacket = new ByteArrayPacket("Hello World".getBytes());
59          doSendReceive(outboundPacket.duplicate());
60      }
61  
62      public void testPeerDisconnect() throws IOException, URISyntaxException, InterruptedException {
63          if( isDisabled() ) {
64              log.info("test disabled: "+getName());
65              return;
66          }
67          
68          Packet outboundPacket = new ByteArrayPacket("Hello World".getBytes());
69          doSendReceive(outboundPacket.duplicate());
70          // disconnect the client.
71          clientChannel.dispose();
72          
73          // The server should get an EOS packet.
74          Packet packet = serverChannel.read(1000);
75          assertEquals(EOSPacket.EOS_PACKET, packet);        
76      }
77  
78      public void testManySmallSendReceives() throws IOException, URISyntaxException, InterruptedException {
79          if( isDisabled() ) {
80              log.info("test disabled: "+getName());
81              return;
82          }
83          log.info("Start of testManySmallSendReceives");
84          Packet outboundPacket = new ByteArrayPacket("Hello World".getBytes());
85          long start = System.currentTimeMillis();
86          for( int i=0; i < getTestIterations(); i++ ) {
87              doSendReceive(outboundPacket.duplicate());
88          }
89          long end = System.currentTimeMillis();        
90          log.info("done. Duration: "+duration(start,end)+", duration per send: "+unitDuration(start, end, getTestIterations()));
91      }
92  
93      private float unitDuration(long start, long end, int testIterations) {
94  		return duration(start,end)/testIterations;
95  	}
96  
97  	private float duration(long start, long end) {
98  		return (float) (((float)(end-start))/1000.0f);
99  	}
100 
101 	protected int getTestIterations() {
102         return 1000;
103     }
104 
105     protected void setUp() throws Exception {
106 
107         log.info("Running: "+getName());
108 
109         if( isDisabled() ) {
110             return;
111         }
112 
113         log.info("Bind to an annonymous tcp port.");
114         server = AsynchToSynchChannelServerAdapter.adapt(bindChannel());
115         server.start();
116         log.info("Server Bound at URI: "+server.getBindURI());
117                 
118         log.info("Client connecting to: "+server.getConnectURI());
119         clientChannel = AsynchToSynchChannelAdapter.adapt( openChannel(server.getConnectURI()));
120         clientChannel.start();
121         SocketMetadata socket = (SocketMetadata) clientChannel.narrow(SocketMetadata.class);
122         if( socket != null )
123             socket.setTcpNoDelay(true);
124         log.info("Get connection that was accepted on the server side.");
125         
126         Channel c = server.accept(1000*5);
127 	assertNotNull(c);
128          
129         serverChannel = AsynchToSynchChannelAdapter.adapt(c);
130         serverChannel.start();
131         socket = (SocketMetadata) serverChannel.narrow(SocketMetadata.class);                
132         if( socket != null ) {
133             socket.setTcpNoDelay(true);
134             log.info("Server Channel's Remote addreess: "+socket.getRemoteSocketAddress());
135             log.info("Server Channel's Local addreess: "+socket.getLocalSocketAddress());
136         }
137     }
138     
139     /***
140      * @param outboundPacket
141      * @throws IOException
142      * @throws URISyntaxException
143      * @throws InterruptedException 
144      */
145     private void doSendReceive(final Packet outboundPacket) throws IOException, URISyntaxException, InterruptedException {
146         ByteArrayPacket inboundPacket = new ByteArrayPacket(new byte[outboundPacket.remaining()]);
147 
148         // Do the send async.
149         sendExecutor.execute( new Runnable() {
150             public void run() {
151                 try {
152                     clientChannel.write(outboundPacket);
153                     clientChannel.flush();
154                 } catch (IOException e) {
155                 }
156             }
157         });
158         
159         while( inboundPacket.hasRemaining() ) {
160 	        Packet packet = serverChannel.read(1000*5);
161 	        assertNotNull(packet);
162 	        packet.read(inboundPacket);
163         }        
164         outboundPacket.clear();
165         inboundPacket.clear();
166         assertEquals(outboundPacket.sliceAsBytes(), inboundPacket.sliceAsBytes());
167                 
168     }
169 
170     protected void tearDown() throws Exception {
171         if( isDisabled() ) {
172             return;
173         }
174         log.info("Closing down the channels.");
175         serverChannel.dispose();        
176         clientChannel.dispose();
177         server.dispose();
178     }
179     
180     protected boolean isDisabled() {
181         return false;
182     }
183 
184     public void assertEquals(byte []b1, byte[] b2 ) {
185         assertEquals(b1.length, b2.length);
186         for (int i = 0; i < b2.length; i++) {
187             assertEquals(b1[i], b2[i]);
188         }
189     }
190     
191     abstract protected Channel openChannel(URI connectURI) throws IOException ;
192     abstract protected ChannelServer bindChannel() throws IOException, URISyntaxException;
193     
194     
195 }