1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.service.impl;
19
20 import org.codehaus.activemq.broker.BrokerClient;
21 import org.codehaus.activemq.broker.impl.BrokerClientImpl;
22 import org.codehaus.activemq.message.ActiveMQMessage;
23 import org.codehaus.activemq.message.DefaultWireFormat;
24 import org.codehaus.activemq.message.MessageAck;
25 import org.codehaus.activemq.message.Packet;
26 import org.codehaus.activemq.message.WireFormat;
27 import org.codehaus.activemq.service.TransactionTask;
28 import org.codehaus.activemq.util.JMSExceptionHelper;
29
30 import javax.jms.JMSException;
31 import java.io.Externalizable;
32 import java.io.IOException;
33 import java.io.ObjectInput;
34 import java.io.ObjectOutput;
35
36 /***
37 * @version $Revision: 1.1 $
38 */
39 public abstract class PacketTransactionTask implements TransactionTask, Externalizable {
40 private static transient final WireFormat wireFormat = new DefaultWireFormat();
41
42 private BrokerClient brokerClient;
43 private Packet packet;
44
45
46 public static TransactionTask fromBytes(byte[] data) throws IOException {
47 Packet packet = wireFormat.fromBytes(data);
48 return createTask(packet);
49 }
50
51 public byte[] toBytes() throws JMSException, IOException {
52 return wireFormat.toBytes(packet);
53 }
54
55 public static TransactionTask readTask(ObjectInput in) throws IOException {
56 Packet packet = readPacket(in);
57 return createTask(packet);
58 }
59
60 public static TransactionTask createTask(Packet packet) throws IOException {
61 if (packet instanceof MessageAck) {
62 return new MessageAckTransactionTask(null, (MessageAck) packet);
63 }
64 else if (packet instanceof ActiveMQMessage) {
65 return new SendMessageTransactionTask(null, (ActiveMQMessage) packet);
66 }
67 else {
68 throw new IOException("Unexpected packet type: " + packet);
69 }
70 }
71
72 public static void writeTask(TransactionTask task, ObjectOutput out) throws IOException {
73 if (task instanceof PacketTransactionTask) {
74 PacketTransactionTask packetTask = (PacketTransactionTask) task;
75 writePacket(packetTask.getPacket(), out);
76 }
77 else {
78 out.writeObject(task);
79 }
80 }
81
82 protected PacketTransactionTask(BrokerClient brokerClient, Packet packet) {
83 this.brokerClient = brokerClient;
84 this.packet = packet;
85 }
86
87 public Packet getPacket() {
88 return packet;
89 }
90
91 public void writeExternal(ObjectOutput out) throws IOException {
92 writePacket(packet, out);
93 }
94
95 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
96 packet = readPacket(in);
97 }
98
99
100
101
102 protected BrokerClient createBrokerClient(String consumerId) throws JMSException {
103 BrokerClientImpl answer = new BrokerClientImpl();
104 return answer;
105 }
106
107 protected BrokerClient getBrokerClient(String consumerId) throws JMSException {
108
109 brokerClient = createBrokerClient(consumerId);
110
111
112
113
114
115 return brokerClient;
116 }
117
118 protected static void writePacket(Packet packet, ObjectOutput out) throws IOException {
119 try {
120 wireFormat.writePacket(packet, out);
121 }
122 catch (JMSException e) {
123 throw JMSExceptionHelper.newIOException(e);
124 }
125 }
126
127 protected static Packet readPacket(ObjectInput in) throws IOException {
128 return wireFormat.readPacket(in);
129 }
130 }