1 /*** 2 * 3 * Copyright 2004 Hiram Chirino 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 **/ 18 package org.activeio.net; 19 20 import java.io.IOException; 21 import java.io.InterruptedIOException; 22 import java.net.URI; 23 24 import org.activeio.Channel; 25 import org.activeio.FilterSynchChannel; 26 import org.activeio.FilterSynchChannelServer; 27 import org.activeio.Packet; 28 import org.activeio.SynchChannel; 29 import org.activeio.SynchChannelFactory; 30 import org.activeio.SynchChannelServer; 31 32 /*** 33 * Makes all the channels produced by another [@see org.activeio.SynchChannelFactory} 34 * have write operations that have built in delays for testing. 35 * 36 * @version $Revision$ 37 */ 38 public class SlowWriteSynchChannelFactory implements SynchChannelFactory { 39 40 final SynchChannelFactory next; 41 private final int maxPacketSize; 42 private final long packetDelay; 43 44 public SlowWriteSynchChannelFactory(final SynchChannelFactory next, int maxPacketSize, long packetDelay) { 45 this.next = next; 46 this.maxPacketSize = maxPacketSize; 47 this.packetDelay = packetDelay; 48 } 49 50 class SlowWriteSynchChannel extends FilterSynchChannel { 51 public SlowWriteSynchChannel(SynchChannel next) { 52 super(next); 53 } 54 public void write(Packet packet) throws IOException { 55 packet = packet.slice(); 56 while(packet.hasRemaining()) { 57 int size = Math.max(maxPacketSize, packet.remaining()); 58 packet.position(size); 59 Packet remaining = packet.slice(); 60 packet.flip(); 61 Packet data = packet.slice(); 62 super.write(data); 63 packet = remaining; 64 try { 65 Thread.sleep(packetDelay); 66 } catch (InterruptedException e) { 67 throw new InterruptedIOException(); 68 } 69 } 70 } 71 } 72 73 class SlowWriteSynchChannelServer extends FilterSynchChannelServer { 74 public SlowWriteSynchChannelServer(SynchChannelServer next) { 75 super(next); 76 } 77 public Channel accept(long timeout) throws IOException { 78 Channel channel = super.accept(timeout); 79 if( channel != null ) { 80 channel = new SlowWriteSynchChannel((SynchChannel) channel); 81 } 82 return channel; 83 } 84 } 85 86 public SynchChannelServer bindSynchChannel(URI location) throws IOException { 87 return next.bindSynchChannel(location); 88 } 89 90 public SynchChannel openSynchChannel(URI location) throws IOException { 91 return new SlowWriteSynchChannel(next.openSynchChannel(location)); 92 } 93 94 }