View Javadoc

1   /***
2    *
3    * Copyright 2004 Protique Ltd
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.codehaus.activemq.message;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.message.util.WireByteArrayInputStream;
23  import org.codehaus.activemq.message.util.WireByteArrayOutputStream;
24  
25  import java.io.DataInput;
26  import java.io.DataInputStream;
27  import java.io.DataOutput;
28  import java.io.DataOutputStream;
29  import java.io.IOException;
30  import java.io.ObjectStreamException;
31  import java.io.Serializable;
32  
33  /***
34   * Default implementation used for Java-Java protocols.
35   * When talking to non-Java nodes we may use a different wire format.
36   *
37   * @version $Revision: 1.13 $
38   */
39  public class DefaultWireFormat extends WireFormat implements Serializable {
40      private static final Log log = LogFactory.getLog(DefaultWireFormat.class);
41  
42      private transient final PacketReader messageReader = new ActiveMQMessageReader();
43      private transient final PacketReader textMessageReader = new ActiveMQTextMessageReader();
44      private transient final PacketReader objectMessageReader = new ActiveMQObjectMessageReader();
45      private transient final PacketReader bytesMessageReader = new ActiveMQBytesMessageReader();
46      private transient final PacketReader streamMessageReader = new ActiveMQStreamMessageReader();
47      private transient final PacketReader mapMessageReader = new ActiveMQMapMessageReader();
48      private transient final PacketReader messageAckReader = new MessageAckReader();
49      private transient final PacketReader receiptReader = new ReceiptReader();
50      private transient final PacketReader consumerInfoReader = new ConsumerInfoReader();
51      private transient final PacketReader producerInfoReader = new ProducerInfoReader();
52      private transient final PacketReader transactionInfoReader = new TransactionInfoReader();
53      private transient final PacketReader xaTransactionInfoReader = new XATransactionInfoReader();
54      private transient final PacketReader brokerInfoReader = new BrokerInfoReader();
55      private transient final PacketReader connectionInfoReader = new ConnectionInfoReader();
56      private transient final PacketReader sessionInfoReader = new SessionInfoReader();
57      private transient final PacketReader durableUnsubscribeReader = new DurableUnsubscribeReader();
58      private transient final PacketReader reponseReceiptReader = new ResponseReceiptReader();
59      private transient final PacketReader intReponseReceiptReader = new IntResponseReceiptReader();
60      private transient final PacketReader capacityInfoReader = new CapacityInfoReader();
61      private transient final PacketReader capacityInfoRequestReader = new CapacityInfoRequestReader();
62  
63      private transient final PacketWriter messageWriter = new ActiveMQMessageWriter();
64      private transient final PacketWriter textMessageWriter = new ActiveMQTextMessageWriter();
65      private transient final PacketWriter objectMessageWriter = new ActiveMQObjectMessageWriter();
66      private transient final PacketWriter bytesMessageWriter = new ActiveMQBytesMessageWriter();
67      private transient final PacketWriter streamMessageWriter = new ActiveMQStreamMessageWriter();
68      private transient final PacketWriter mapMessageWriter = new ActiveMQMapMessageWriter();
69      private transient final PacketWriter messageAckWriter = new MessageAckWriter();
70      private transient final PacketWriter receiptWriter = new ReceiptWriter();
71      private transient final PacketWriter consumerInfoWriter = new ConsumerInfoWriter();
72      private transient final PacketWriter producerInfoWriter = new ProducerInfoWriter();
73      private transient final PacketWriter transactionInfoWriter = new TransactionInfoWriter();
74      private transient final PacketWriter xaTransactionInfoWriter = new XATransactionInfoWriter();
75      private transient final PacketWriter brokerInfoWriter = new BrokerInfoWriter();
76      private transient final PacketWriter connectionInfoWriter = new ConnectionInfoWriter();
77      private transient final PacketWriter sessionInfoWriter = new SessionInfoWriter();
78      private transient final PacketWriter durableUnsubscribeWriter = new DurableUnsubscribeWriter();
79      private transient final PacketWriter reponseReceiptWriter = new ResponseReceiptWriter();
80      private transient final PacketWriter intReponseReceiptWriter = new IntResponseReceiptWriter();
81      private transient final PacketWriter capacityInfoWriter = new CapacityInfoWriter();
82      private transient final PacketWriter capacityInfoRequestWriter = new CapacityInfoRequestWriter();
83  
84      private transient WireByteArrayOutputStream internalBytesOut;
85      private transient DataOutputStream internalDataOut;
86  
87      private transient WireByteArrayInputStream internalBytesIn;
88      private transient DataInputStream internalDataIn;
89  
90      /***
91       * Default Constructor
92       */
93      public DefaultWireFormat() {
94          internalBytesOut = new WireByteArrayOutputStream();
95          internalDataOut = new DataOutputStream(internalBytesOut);
96          internalBytesIn = new WireByteArrayInputStream();
97          internalDataIn = new DataInputStream(internalBytesIn);
98      }
99  
100     public WireFormat copy() {
101         return new DefaultWireFormat();
102     }
103 
104     public Packet readPacket(DataInput in) throws IOException {
105         int type = in.readByte();
106         return readPacket(type, in);
107     }
108 
109     public Packet readPacket(int firstByte, DataInput dataIn) throws IOException {
110         switch (firstByte) {
111             case Packet.ACTIVEMQ_MESSAGE:
112                 return readPacket(dataIn, messageReader);
113 
114             case Packet.ACTIVEMQ_TEXT_MESSAGE:
115                 return readPacket(dataIn, textMessageReader);
116 
117             case Packet.ACTIVEMQ_OBJECT_MESSAGE:
118                 return readPacket(dataIn, objectMessageReader);
119 
120             case Packet.ACTIVEMQ_BYTES_MESSAGE:
121                 return readPacket(dataIn, bytesMessageReader);
122 
123             case Packet.ACTIVEMQ_STREAM_MESSAGE:
124                 return readPacket(dataIn, streamMessageReader);
125 
126             case Packet.ACTIVEMQ_MAP_MESSAGE:
127                 return readPacket(dataIn, mapMessageReader);
128 
129             case Packet.ACTIVEMQ_MSG_ACK:
130                 return readPacket(dataIn, messageAckReader);
131 
132             case Packet.RECEIPT_INFO:
133                 return readPacket(dataIn, receiptReader);
134 
135             case Packet.CONSUMER_INFO:
136                 return readPacket(dataIn, consumerInfoReader);
137 
138             case Packet.PRODUCER_INFO:
139                 return readPacket(dataIn, producerInfoReader);
140 
141             case Packet.TRANSACTION_INFO:
142                 return readPacket(dataIn, transactionInfoReader);
143 
144             case Packet.XA_TRANSACTION_INFO:
145                 return readPacket(dataIn, xaTransactionInfoReader);
146 
147             case Packet.ACTIVEMQ_BROKER_INFO:
148                 return readPacket(dataIn, brokerInfoReader);
149 
150             case Packet.ACTIVEMQ_CONNECTION_INFO:
151                 return readPacket(dataIn, connectionInfoReader);
152 
153             case Packet.SESSION_INFO:
154                 return readPacket(dataIn, sessionInfoReader);
155 
156             case Packet.DURABLE_UNSUBSCRIBE:
157                 return readPacket(dataIn, durableUnsubscribeReader);
158 
159             case Packet.RESPONSE_RECEIPT_INFO:
160                 return readPacket(dataIn, reponseReceiptReader);
161 
162             case Packet.INT_RESPONSE_RECEIPT_INFO:
163                 return readPacket(dataIn, intReponseReceiptReader);
164 
165             case Packet.CAPACITY_INFO:
166                 return readPacket(dataIn, capacityInfoReader);
167 
168             case Packet.CAPACITY_INFO_REQUEST:
169                 return readPacket(dataIn, capacityInfoRequestReader);
170 
171 
172             default:
173                 log.error("Could not find PacketReader for packet type: " + AbstractPacket.getPacketTypeAsString(firstByte));
174                 return null;
175         }
176     }
177 
178 
179     /***
180      * Write a Packet to a DataOutput
181      *
182      * @param packet
183      * @param dataOut
184      * @throws IOException
185      */
186     public void writePacket(Packet packet, DataOutput dataOut) throws IOException {
187         PacketWriter writer = getWriter(packet);
188         if (writer != null) {
189             writePacket(packet, dataOut, writer);
190         }
191     }
192 
193     /***
194      * A helper method which converts a packet into a byte array
195      * Overrides the WireFormat to make use of the internal BytesOutputStream
196      *
197      * @param packet
198      * @return a byte array representing the packet using some wire protocol
199      * @throws IOException
200      */
201     public byte[] toBytes(Packet packet) throws IOException {
202         byte[] data = null;
203         PacketWriter writer = getWriter(packet);
204         if (writer != null) {
205             internalBytesOut.reset();
206             internalDataOut.writeByte(packet.getPacketType());
207             internalDataOut.writeInt(-1);//the length
208             writer.writePacket(packet, internalDataOut);
209             internalDataOut.flush();
210             data = internalBytesOut.toByteArray();
211 
212             // lets subtract the header offset from the length
213             int length = data.length - 5;
214             packet.setMemoryUsage(length);
215             //write in the length to the data
216             data[1] = (byte) ((length >>> 24) & 0xFF);
217             data[2] = (byte) ((length >>> 16) & 0xFF);
218             data[3] = (byte) ((length >>> 8) & 0xFF);
219             data[4] = (byte) ((length >>> 0) & 0xFF);
220         }
221         return data;
222     }
223 
224 
225     protected synchronized final void writePacket(Packet packet, DataOutput dataOut, PacketWriter writer) throws IOException {
226         dataOut.writeByte(packet.getPacketType());
227         internalBytesOut.reset();
228         writer.writePacket(packet, internalDataOut);
229         internalDataOut.flush();
230         //reuse the byte buffer in the ByteArrayOutputStream
231         byte[] data = internalBytesOut.getData();
232         int count = internalBytesOut.size();
233         dataOut.writeInt(count);
234         //byte[] data = internalBytesOut.toByteArray();
235         //int count = data.length;
236         //dataOut.writeInt(count);
237         
238         packet.setMemoryUsage(count);
239         dataOut.write(data, 0, count);
240     }
241 
242     protected synchronized final Packet readPacket(DataInput dataIn, PacketReader reader) throws IOException {
243         Packet packet = reader.createPacket();
244         int length = dataIn.readInt();
245         packet.setMemoryUsage(length);
246         // read all the remaining data in one chunk ignoring the header
247 
248         // TODO sometimes the length should exclude the header?
249         byte[] data = new byte[length];
250         dataIn.readFully(data);
251         //then splat into the internal datainput
252         internalBytesIn.restart(data);
253         reader.buildPacket(packet, internalDataIn);
254         return packet;
255     }
256 
257     private Object readResolve() throws ObjectStreamException {
258         return new DefaultWireFormat();
259     }
260 
261     private PacketWriter getWriter(Packet packet) throws IOException {
262         PacketWriter answer = null;
263         switch (packet.getPacketType()) {
264             case Packet.ACTIVEMQ_MESSAGE:
265                 answer = messageWriter;
266                 break;
267 
268             case Packet.ACTIVEMQ_TEXT_MESSAGE:
269                 answer = textMessageWriter;
270                 break;
271 
272             case Packet.ACTIVEMQ_OBJECT_MESSAGE:
273                 answer = objectMessageWriter;
274                 break;
275 
276             case Packet.ACTIVEMQ_BYTES_MESSAGE:
277                 answer = bytesMessageWriter;
278                 break;
279 
280             case Packet.ACTIVEMQ_STREAM_MESSAGE:
281                 answer = streamMessageWriter;
282                 break;
283 
284             case Packet.ACTIVEMQ_MAP_MESSAGE:
285                 answer = mapMessageWriter;
286                 break;
287 
288             case Packet.ACTIVEMQ_MSG_ACK:
289                 answer = messageAckWriter;
290                 break;
291 
292             case Packet.RECEIPT_INFO:
293                 answer = receiptWriter;
294                 break;
295 
296             case Packet.CONSUMER_INFO:
297                 answer = consumerInfoWriter;
298                 break;
299 
300             case Packet.PRODUCER_INFO:
301                 answer = producerInfoWriter;
302                 break;
303 
304             case Packet.TRANSACTION_INFO:
305                 answer = transactionInfoWriter;
306                 break;
307 
308             case Packet.XA_TRANSACTION_INFO:
309                 answer = xaTransactionInfoWriter;
310                 break;
311 
312             case Packet.ACTIVEMQ_BROKER_INFO:
313                 answer = brokerInfoWriter;
314                 break;
315 
316             case Packet.ACTIVEMQ_CONNECTION_INFO:
317                 answer = connectionInfoWriter;
318                 break;
319 
320             case Packet.SESSION_INFO:
321                 answer = sessionInfoWriter;
322                 break;
323 
324             case Packet.DURABLE_UNSUBSCRIBE:
325                 answer = durableUnsubscribeWriter;
326                 break;
327 
328             case Packet.RESPONSE_RECEIPT_INFO:
329                 answer = reponseReceiptWriter;
330                 break;
331 
332             case Packet.INT_RESPONSE_RECEIPT_INFO:
333                 answer = intReponseReceiptWriter;
334                 break;
335 
336             case Packet.CAPACITY_INFO:
337                 answer = capacityInfoWriter;
338                 break;
339 
340             case Packet.CAPACITY_INFO_REQUEST:
341                 answer = capacityInfoRequestWriter;
342                 break;
343 
344             default:
345                 log.error("no PacketWriter for packet: " + packet);
346         }
347         return answer;
348     }
349 }