View Javadoc

1   /***
2    *
3    * Copyright 2004 Hiram Chirino
4    *
5    *  Licensed under the Apache License, Version 2.0 (the "License");
6    *  you may not use this file except in compliance with the License.
7    *  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   */
17  package org.activeio.filter;
18  
19  import java.io.IOException;
20  
21  import org.activeio.Packet;
22  import org.activeio.PacketData;
23  import org.activeio.packet.AppendedPacket;
24  import org.activeio.packet.ByteArrayPacket;
25  import org.activeio.packet.EOSPacket;
26  
27  /***
28   * @version $Revision$
29   */
30  abstract public class PacketAggregator {
31  
32      private static final int HEADER_LENGTH = 4;        
33      
34      private final ByteArrayPacket headerBuffer = new ByteArrayPacket(new byte[HEADER_LENGTH]);
35      private final PacketData headerData = new PacketData(headerBuffer);
36  
37      Packet incompleteUpPacket;
38      boolean headerLoaded;
39      private int upPacketLength;
40      
41      public void addRawPacket(Packet packet) throws IOException {
42  
43          // Passthrough the EOS packet.
44          if( packet == EOSPacket.EOS_PACKET ) {
45              packetAssembled(packet);
46              return;
47          }
48  
49          if (incompleteUpPacket != null) {
50              packet = AppendedPacket.join(incompleteUpPacket, packet);
51              incompleteUpPacket = null;
52          }
53  
54          while (true) {
55  
56              if (!headerLoaded) {
57                  headerLoaded = packet.remaining() >= HEADER_LENGTH;
58                  if( headerLoaded ) {
59                      PacketData data = new PacketData(packet);
60                      upPacketLength = data.readInt();
61                      if( upPacketLength < 0 ) {
62                          throw new IOException("Up packet lenth was invalid: "+upPacketLength);
63                      }
64                      packet = packet.slice();
65                  }
66                  if( !headerLoaded )
67                      break;
68              }
69  
70              if (packet.remaining() < upPacketLength )
71                  break;
72  
73              // Get ready to create a slice to send up.
74              int origLimit = packet.limit();
75              packet.limit(upPacketLength);
76              packetAssembled(packet.slice());
77              
78              // Get a slice of the remaining since that will dump
79              // the first packets of an AppendedPacket
80              packet.position(upPacketLength);
81              packet.limit(origLimit);
82              packet = packet.slice();
83  
84              // Need to load a header again now.
85              headerLoaded = false;
86          }
87          if (packet.hasRemaining()) {
88              incompleteUpPacket = packet;
89          }
90          
91      }
92  
93      protected abstract void packetAssembled(Packet packet);
94      
95      public Packet getHeader( Packet packet ) throws IOException {
96          headerBuffer.clear();
97          headerData.writeInt(packet.remaining());
98          headerBuffer.flip();
99          return headerBuffer;
100     }
101 }