View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.message;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  
24  import javax.jms.JMSException;
25  import java.io.ByteArrayInputStream;
26  import java.io.ByteArrayOutputStream;
27  import java.io.DataInput;
28  import java.io.DataInputStream;
29  import java.io.DataOutput;
30  import java.io.DataOutputStream;
31  import java.io.IOException;
32  import java.net.DatagramPacket;
33  
34  /***
35   * Represents a strategy of encoding packets on the wire or on disk
36   * using some kind of serialization or wire format.
37   * <p/>
38   * We use a default efficient format
39   * for Java to Java communication but other formats to other systems
40   * can be used, such as using simple text
41   * strings when talking to JavaScript or coming up with other formats for
42   * talking to C / C# languages or proprietary messaging systems
43   * we wish to interface with at the wire level etc.
44   *
45   * @version $Revision: 1.9 $
46   */
47  public abstract class WireFormat {
48  
49      private static final Log log = LogFactory.getLog(WireFormat.class);
50  
51      /***
52       * Reads a packet from the given input stream
53       *
54       * @param in
55       * @return
56       * @throws IOException
57       */
58      public abstract Packet readPacket(DataInput in) throws IOException;
59  
60      /***
61       * A helper method for working with sockets where the first byte is read
62       * first, then the rest of the message is read.
63       * <p/>
64       * Its common when dealing with sockets to have different timeout semantics
65       * until the first non-zero byte is read of a message, after which
66       * time a zero timeout is used.
67       *
68       * @param firstByte the first byte of the packet
69       * @param in        the rest of the packet
70       * @return
71       * @throws IOException
72       */
73      public abstract Packet readPacket(int firstByte, DataInput in) throws IOException;
74  
75  
76      /***
77       * Read a packet from a Datagram packet from the given channelID. If the
78       * packet is from the same channel ID as it was sent then we have a
79       * loop-back so discard the packet
80       *
81       * @param channelID is the unique channel ID
82       * @param dpacket
83       * @return the packet read from the datagram or null if it should be
84       *         discarded
85       * @throws IOException
86       */
87      public Packet readPacket(String channelID, DatagramPacket dpacket) throws IOException {
88          DataInput in = new DataInputStream(new ByteArrayInputStream(dpacket.getData(), dpacket.getOffset(), dpacket.getLength()));
89          String id = in.readUTF();
90  
91          if (channelID == null) {
92              log.trace("We do not have a channelID which is probably caused by a synchronization issue, we're receiving messages before we're fully initialised");
93          }
94          else if (channelID.equals(id)) {
95              if (log.isTraceEnabled()) {
96                  log.trace("Discarding packet from id: " + id);
97              }
98              return null;
99          }
100         int type = in.readByte();
101         Packet packet = readPacket(type, in);
102 
103 //        if (packet instanceof ActiveMQMessage) {
104 //            System.out.println("#####  read packet from channel: " + id + " in channel: " + channelID + " message: " + packet);
105 //        }
106 //
107         return packet;
108     }
109 
110     /***
111      * Writes the packet to the given output stream
112      *
113      * @param packet
114      * @param out
115      * @throws IOException
116      */
117     public abstract void writePacket(Packet packet, DataOutput out) throws IOException, JMSException;
118 
119     /***
120      * Writes the given package to a new datagram
121      *
122      * @param channelID is the unique channel ID
123      * @param packet    is the packet to write
124      */
125     public DatagramPacket writePacket(String channelID, Packet packet) throws IOException, JMSException {
126         ByteArrayOutputStream out = new ByteArrayOutputStream();
127         DataOutputStream dataOut = new DataOutputStream(out);
128         dataOut.writeUTF(channelID);
129 
130 //        if (packet instanceof ActiveMQMessage) {
131 //            System.out.println("##### write packet from channel: " + channelID + " message: " + packet);
132 //        }
133 
134         writePacket(packet, dataOut);
135         dataOut.close();
136         byte[] data = out.toByteArray();
137         return new DatagramPacket(data, data.length);
138     }
139 
140     /***
141      * Reads the packet from the given byte[]
142      */
143     public Packet fromBytes(byte[] bytes, int offset, int length) throws IOException {
144         //System.out.println("Reading from offset: " + offset + " length: " + length);
145         DataInput in = new DataInputStream(new ByteArrayInputStream(bytes, offset, length));
146         return readPacket(in);
147     }
148 
149     /***
150      * Reads the packet from the given byte[]
151      */
152     public Packet fromBytes(byte[] bytes) throws IOException {
153         DataInput in = new DataInputStream(new ByteArrayInputStream(bytes));
154         return readPacket(in);
155     }
156 
157     /***
158      * A helper method which converts a packet into a byte array
159      *
160      * @param packet
161      * @return a byte array representing the packet using some wire protocol
162      * @throws IOException
163      */
164     public byte[] toBytes(Packet packet) throws IOException, JMSException {
165         ByteArrayOutputStream out = new ByteArrayOutputStream();
166         DataOutputStream dataOut = new DataOutputStream(out);
167         writePacket(packet, dataOut);
168         dataOut.close();
169         return out.toByteArray();
170     }
171 
172     /***
173      * Creates a new copy of this wire format so it can be used in another thread/context
174      *
175      * @return
176      */
177     public abstract WireFormat copy();
178 }