1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.activeio.filter;
18
19 import java.io.IOException;
20 import java.util.LinkedList;
21
22 import org.activeio.FilterSynchChannel;
23 import org.activeio.Packet;
24 import org.activeio.SynchChannel;
25
26 /***
27 * This PacketAggregatingSynchChannel can be used when the client is sending a
28 * 'record' style packet down the channel stack and needs receiving end to
29 * receive the same 'record' packets.
30 *
31 * This is very usefull since in general, a channel does not garantee that a
32 * Packet that is sent down will not be fragmented or combined with other Packet
33 * objects.
34 *
35 * This {@see org.activeio.SynchChannel} adds a 4 byte header
36 * to each packet that is sent down.
37 *
38 * @version $Revision$
39 */
40 final public class PacketAggregatingSynchChannel extends FilterSynchChannel {
41
42 private final LinkedList assembledPackets = new LinkedList();
43 private final PacketAggregator aggregator = new PacketAggregator() {
44 protected void packetAssembled(Packet packet) {
45 assembledPackets.addLast(packet);
46 }
47 };
48
49 /***
50 * @param next
51 */
52 public PacketAggregatingSynchChannel(SynchChannel next) {
53 super(next);
54 }
55
56 public Packet read(long timeout) throws IOException {
57 long start = System.currentTimeMillis();
58 if( assembledPackets.isEmpty() ) {
59 while( true ) {
60
61 Packet packet = getNext().read(timeout);
62 if( packet==null ) {
63 return null;
64 }
65
66 aggregator.addRawPacket(packet);
67
68
69 if( assembledPackets.isEmpty() ) {
70 if( timeout == WAIT_FOREVER_TIMEOUT )
71 continue;
72
73 timeout = Math.max(0, timeout-(System.currentTimeMillis()-start));
74 if( timeout != 0 )
75 continue;
76
77 return null;
78 } else {
79 return (Packet) assembledPackets.removeFirst();
80 }
81 }
82
83 } else {
84 return (Packet) assembledPackets.removeFirst();
85 }
86
87 }
88
89 public void write(Packet packet) throws IOException {
90 getNext().write(aggregator.getHeader(packet));
91 getNext().write(packet);
92 }
93 }