1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.message;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.message.util.WireByteArrayInputStream;
23 import org.codehaus.activemq.message.util.WireByteArrayOutputStream;
24
25 import java.io.DataInput;
26 import java.io.DataInputStream;
27 import java.io.DataOutput;
28 import java.io.DataOutputStream;
29 import java.io.IOException;
30 import java.io.ObjectStreamException;
31 import java.io.Serializable;
32
33 /***
34 * Default implementation used for Java-Java protocols.
35 * When talking to non-Java nodes we may use a different wire format.
36 *
37 * @version $Revision: 1.13 $
38 */
39 public class DefaultWireFormat extends WireFormat implements Serializable {
40 private static final Log log = LogFactory.getLog(DefaultWireFormat.class);
41
42 private transient final PacketReader messageReader = new ActiveMQMessageReader();
43 private transient final PacketReader textMessageReader = new ActiveMQTextMessageReader();
44 private transient final PacketReader objectMessageReader = new ActiveMQObjectMessageReader();
45 private transient final PacketReader bytesMessageReader = new ActiveMQBytesMessageReader();
46 private transient final PacketReader streamMessageReader = new ActiveMQStreamMessageReader();
47 private transient final PacketReader mapMessageReader = new ActiveMQMapMessageReader();
48 private transient final PacketReader messageAckReader = new MessageAckReader();
49 private transient final PacketReader receiptReader = new ReceiptReader();
50 private transient final PacketReader consumerInfoReader = new ConsumerInfoReader();
51 private transient final PacketReader producerInfoReader = new ProducerInfoReader();
52 private transient final PacketReader transactionInfoReader = new TransactionInfoReader();
53 private transient final PacketReader xaTransactionInfoReader = new XATransactionInfoReader();
54 private transient final PacketReader brokerInfoReader = new BrokerInfoReader();
55 private transient final PacketReader connectionInfoReader = new ConnectionInfoReader();
56 private transient final PacketReader sessionInfoReader = new SessionInfoReader();
57 private transient final PacketReader durableUnsubscribeReader = new DurableUnsubscribeReader();
58 private transient final PacketReader reponseReceiptReader = new ResponseReceiptReader();
59 private transient final PacketReader intReponseReceiptReader = new IntResponseReceiptReader();
60 private transient final PacketReader capacityInfoReader = new CapacityInfoReader();
61 private transient final PacketReader capacityInfoRequestReader = new CapacityInfoRequestReader();
62
63 private transient final PacketWriter messageWriter = new ActiveMQMessageWriter();
64 private transient final PacketWriter textMessageWriter = new ActiveMQTextMessageWriter();
65 private transient final PacketWriter objectMessageWriter = new ActiveMQObjectMessageWriter();
66 private transient final PacketWriter bytesMessageWriter = new ActiveMQBytesMessageWriter();
67 private transient final PacketWriter streamMessageWriter = new ActiveMQStreamMessageWriter();
68 private transient final PacketWriter mapMessageWriter = new ActiveMQMapMessageWriter();
69 private transient final PacketWriter messageAckWriter = new MessageAckWriter();
70 private transient final PacketWriter receiptWriter = new ReceiptWriter();
71 private transient final PacketWriter consumerInfoWriter = new ConsumerInfoWriter();
72 private transient final PacketWriter producerInfoWriter = new ProducerInfoWriter();
73 private transient final PacketWriter transactionInfoWriter = new TransactionInfoWriter();
74 private transient final PacketWriter xaTransactionInfoWriter = new XATransactionInfoWriter();
75 private transient final PacketWriter brokerInfoWriter = new BrokerInfoWriter();
76 private transient final PacketWriter connectionInfoWriter = new ConnectionInfoWriter();
77 private transient final PacketWriter sessionInfoWriter = new SessionInfoWriter();
78 private transient final PacketWriter durableUnsubscribeWriter = new DurableUnsubscribeWriter();
79 private transient final PacketWriter reponseReceiptWriter = new ResponseReceiptWriter();
80 private transient final PacketWriter intReponseReceiptWriter = new IntResponseReceiptWriter();
81 private transient final PacketWriter capacityInfoWriter = new CapacityInfoWriter();
82 private transient final PacketWriter capacityInfoRequestWriter = new CapacityInfoRequestWriter();
83
84 private transient WireByteArrayOutputStream internalBytesOut;
85 private transient DataOutputStream internalDataOut;
86
87 private transient WireByteArrayInputStream internalBytesIn;
88 private transient DataInputStream internalDataIn;
89
90 /***
91 * Default Constructor
92 */
93 public DefaultWireFormat() {
94 internalBytesOut = new WireByteArrayOutputStream();
95 internalDataOut = new DataOutputStream(internalBytesOut);
96 internalBytesIn = new WireByteArrayInputStream();
97 internalDataIn = new DataInputStream(internalBytesIn);
98 }
99
100 public WireFormat copy() {
101 return new DefaultWireFormat();
102 }
103
104 public Packet readPacket(DataInput in) throws IOException {
105 int type = in.readByte();
106 return readPacket(type, in);
107 }
108
109 public Packet readPacket(int firstByte, DataInput dataIn) throws IOException {
110 switch (firstByte) {
111 case Packet.ACTIVEMQ_MESSAGE:
112 return readPacket(dataIn, messageReader);
113
114 case Packet.ACTIVEMQ_TEXT_MESSAGE:
115 return readPacket(dataIn, textMessageReader);
116
117 case Packet.ACTIVEMQ_OBJECT_MESSAGE:
118 return readPacket(dataIn, objectMessageReader);
119
120 case Packet.ACTIVEMQ_BYTES_MESSAGE:
121 return readPacket(dataIn, bytesMessageReader);
122
123 case Packet.ACTIVEMQ_STREAM_MESSAGE:
124 return readPacket(dataIn, streamMessageReader);
125
126 case Packet.ACTIVEMQ_MAP_MESSAGE:
127 return readPacket(dataIn, mapMessageReader);
128
129 case Packet.ACTIVEMQ_MSG_ACK:
130 return readPacket(dataIn, messageAckReader);
131
132 case Packet.RECEIPT_INFO:
133 return readPacket(dataIn, receiptReader);
134
135 case Packet.CONSUMER_INFO:
136 return readPacket(dataIn, consumerInfoReader);
137
138 case Packet.PRODUCER_INFO:
139 return readPacket(dataIn, producerInfoReader);
140
141 case Packet.TRANSACTION_INFO:
142 return readPacket(dataIn, transactionInfoReader);
143
144 case Packet.XA_TRANSACTION_INFO:
145 return readPacket(dataIn, xaTransactionInfoReader);
146
147 case Packet.ACTIVEMQ_BROKER_INFO:
148 return readPacket(dataIn, brokerInfoReader);
149
150 case Packet.ACTIVEMQ_CONNECTION_INFO:
151 return readPacket(dataIn, connectionInfoReader);
152
153 case Packet.SESSION_INFO:
154 return readPacket(dataIn, sessionInfoReader);
155
156 case Packet.DURABLE_UNSUBSCRIBE:
157 return readPacket(dataIn, durableUnsubscribeReader);
158
159 case Packet.RESPONSE_RECEIPT_INFO:
160 return readPacket(dataIn, reponseReceiptReader);
161
162 case Packet.INT_RESPONSE_RECEIPT_INFO:
163 return readPacket(dataIn, intReponseReceiptReader);
164
165 case Packet.CAPACITY_INFO:
166 return readPacket(dataIn, capacityInfoReader);
167
168 case Packet.CAPACITY_INFO_REQUEST:
169 return readPacket(dataIn, capacityInfoRequestReader);
170
171
172 default:
173 log.error("Could not find PacketReader for packet type: " + AbstractPacket.getPacketTypeAsString(firstByte));
174 return null;
175 }
176 }
177
178
179 /***
180 * Write a Packet to a DataOutput
181 *
182 * @param packet
183 * @param dataOut
184 * @throws IOException
185 */
186 public void writePacket(Packet packet, DataOutput dataOut) throws IOException {
187 PacketWriter writer = getWriter(packet);
188 if (writer != null) {
189 writePacket(packet, dataOut, writer);
190 }
191 }
192
193 /***
194 * A helper method which converts a packet into a byte array
195 * Overrides the WireFormat to make use of the internal BytesOutputStream
196 *
197 * @param packet
198 * @return a byte array representing the packet using some wire protocol
199 * @throws IOException
200 */
201 public byte[] toBytes(Packet packet) throws IOException {
202 byte[] data = null;
203 PacketWriter writer = getWriter(packet);
204 if (writer != null) {
205 internalBytesOut.reset();
206 internalDataOut.writeByte(packet.getPacketType());
207 internalDataOut.writeInt(-1);
208 writer.writePacket(packet, internalDataOut);
209 internalDataOut.flush();
210 data = internalBytesOut.toByteArray();
211
212
213 int length = data.length - 5;
214 packet.setMemoryUsage(length);
215
216 data[1] = (byte) ((length >>> 24) & 0xFF);
217 data[2] = (byte) ((length >>> 16) & 0xFF);
218 data[3] = (byte) ((length >>> 8) & 0xFF);
219 data[4] = (byte) ((length >>> 0) & 0xFF);
220 }
221 return data;
222 }
223
224
225 protected synchronized final void writePacket(Packet packet, DataOutput dataOut, PacketWriter writer) throws IOException {
226 dataOut.writeByte(packet.getPacketType());
227 internalBytesOut.reset();
228 writer.writePacket(packet, internalDataOut);
229 internalDataOut.flush();
230
231 byte[] data = internalBytesOut.getData();
232 int count = internalBytesOut.size();
233 dataOut.writeInt(count);
234
235
236
237
238 packet.setMemoryUsage(count);
239 dataOut.write(data, 0, count);
240 }
241
242 protected synchronized final Packet readPacket(DataInput dataIn, PacketReader reader) throws IOException {
243 Packet packet = reader.createPacket();
244 int length = dataIn.readInt();
245 packet.setMemoryUsage(length);
246
247
248
249 byte[] data = new byte[length];
250 dataIn.readFully(data);
251
252 internalBytesIn.restart(data);
253 reader.buildPacket(packet, internalDataIn);
254 return packet;
255 }
256
257 private Object readResolve() throws ObjectStreamException {
258 return new DefaultWireFormat();
259 }
260
261 private PacketWriter getWriter(Packet packet) throws IOException {
262 PacketWriter answer = null;
263 switch (packet.getPacketType()) {
264 case Packet.ACTIVEMQ_MESSAGE:
265 answer = messageWriter;
266 break;
267
268 case Packet.ACTIVEMQ_TEXT_MESSAGE:
269 answer = textMessageWriter;
270 break;
271
272 case Packet.ACTIVEMQ_OBJECT_MESSAGE:
273 answer = objectMessageWriter;
274 break;
275
276 case Packet.ACTIVEMQ_BYTES_MESSAGE:
277 answer = bytesMessageWriter;
278 break;
279
280 case Packet.ACTIVEMQ_STREAM_MESSAGE:
281 answer = streamMessageWriter;
282 break;
283
284 case Packet.ACTIVEMQ_MAP_MESSAGE:
285 answer = mapMessageWriter;
286 break;
287
288 case Packet.ACTIVEMQ_MSG_ACK:
289 answer = messageAckWriter;
290 break;
291
292 case Packet.RECEIPT_INFO:
293 answer = receiptWriter;
294 break;
295
296 case Packet.CONSUMER_INFO:
297 answer = consumerInfoWriter;
298 break;
299
300 case Packet.PRODUCER_INFO:
301 answer = producerInfoWriter;
302 break;
303
304 case Packet.TRANSACTION_INFO:
305 answer = transactionInfoWriter;
306 break;
307
308 case Packet.XA_TRANSACTION_INFO:
309 answer = xaTransactionInfoWriter;
310 break;
311
312 case Packet.ACTIVEMQ_BROKER_INFO:
313 answer = brokerInfoWriter;
314 break;
315
316 case Packet.ACTIVEMQ_CONNECTION_INFO:
317 answer = connectionInfoWriter;
318 break;
319
320 case Packet.SESSION_INFO:
321 answer = sessionInfoWriter;
322 break;
323
324 case Packet.DURABLE_UNSUBSCRIBE:
325 answer = durableUnsubscribeWriter;
326 break;
327
328 case Packet.RESPONSE_RECEIPT_INFO:
329 answer = reponseReceiptWriter;
330 break;
331
332 case Packet.INT_RESPONSE_RECEIPT_INFO:
333 answer = intReponseReceiptWriter;
334 break;
335
336 case Packet.CAPACITY_INFO:
337 answer = capacityInfoWriter;
338 break;
339
340 case Packet.CAPACITY_INFO_REQUEST:
341 answer = capacityInfoRequestWriter;
342 break;
343
344 default:
345 log.error("no PacketWriter for packet: " + packet);
346 }
347 return answer;
348 }
349 }