1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.service.impl;
19
20 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.codehaus.activemq.broker.Broker;
24 import org.codehaus.activemq.broker.BrokerClient;
25 import org.codehaus.activemq.message.ActiveMQXid;
26 import org.codehaus.activemq.service.Transaction;
27 import org.codehaus.activemq.service.TransactionManager;
28 import org.codehaus.activemq.store.PreparedTransactionStore;
29 import org.codehaus.activemq.util.JMSExceptionHelper;
30
31 import javax.jms.JMSException;
32 import javax.transaction.xa.XAException;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Map;
36
37 /***
38 * @version $Revision: 1.4 $
39 */
40 public class TransactionManagerImpl implements TransactionManager {
41 private static final Log log = LogFactory.getLog(TransactionManagerImpl.class);
42
43
44 private Broker broker;
45
46 private PreparedTransactionStore preparedTransactions;
47
48 private Map activeClients = new ConcurrentHashMap();
49
50 private Map localTxs = new ConcurrentHashMap();
51
52 private Map xaTxs = new ConcurrentHashMap();
53
54 private final ThreadLocal contextTx = new ThreadLocal();
55
56 public TransactionManagerImpl(Broker broker, PreparedTransactionStore preparedTransactions) {
57 this.preparedTransactions = preparedTransactions;
58 this.broker = broker;
59 }
60
61 /***
62 * @see org.codehaus.activemq.service.TransactionManager#createLocalTransaction(org.codehaus.activemq.broker.BrokerClient, String)
63 */
64 public Transaction createLocalTransaction(final BrokerClient client, final String txid) throws JMSException {
65 AbstractTransaction t = new AbstractTransaction(broker) {
66
67 public void commit(boolean onePhase) throws XAException {
68
69 try {
70 prePrepare();
71 }
72 catch (XAException e) {
73 throw e;
74 }
75 catch (Throwable e) {
76 log.warn("COMMIT FAILED: ", e);
77 rollback();
78
79 XAException xae = new XAException("COMMIT FAILED: Transaction rolled back.");
80 xae.errorCode = XAException.XA_RBOTHER;
81 xae.initCause(e);
82 throw xae;
83 }
84
85 setState(AbstractTransaction.FINISHED_STATE);
86 localTxs.remove(txid);
87
88 try {
89 postCommit();
90 }
91 catch (Throwable e) {
92
93
94 log.warn("POST COMMIT FAILED: ", e);
95 XAException xae = new XAException("POST COMMIT FAILED");
96 xae.errorCode = XAException.XAER_RMERR;
97 xae.initCause(e);
98 throw xae;
99 }
100 }
101
102 public void rollback() throws XAException {
103
104 setState(AbstractTransaction.FINISHED_STATE);
105 localTxs.remove(txid);
106
107 try {
108 postRollback();
109 }
110 catch (Throwable e) {
111 log.warn("POST ROLLBACK FAILED: ", e);
112 XAException xae = new XAException("POST ROLLBACK FAILED");
113 xae.errorCode = XAException.XAER_RMERR;
114 xae.initCause(e);
115 throw xae;
116 }
117 }
118
119 public int prepare() throws XAException {
120 XAException xae = new XAException("Prepare not implemented on Local Transactions.");
121 xae.errorCode = XAException.XAER_RMERR;
122 throw xae;
123 }
124
125 };
126 localTxs.put(txid, t);
127 return t;
128 }
129
130 /***
131 * @see org.codehaus.activemq.service.TransactionManager#createXATransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
132 */
133 public Transaction createXATransaction(final BrokerClient client, final ActiveMQXid xid) throws XAException {
134 AbstractTransaction t = new XATransactionCommand(broker, xid, xaTxs, preparedTransactions);
135 xaTxs.put(xid, t);
136 return t;
137 }
138
139 /***
140 * @see org.codehaus.activemq.service.TransactionManager#getLocalTransaction(String)
141 */
142 public Transaction getLocalTransaction(String txid) throws JMSException {
143 Transaction tx = (Transaction) localTxs.get(txid);
144 if (tx == null) {
145 throw new JMSException("Transaction '" + txid
146 + "' has not been started.");
147 }
148 return tx;
149 }
150
151 /***
152 * @see org.codehaus.activemq.service.TransactionManager#getXATransaction(org.codehaus.activemq.message.ActiveMQXid)
153 */
154 public Transaction getXATransaction(ActiveMQXid xid) throws XAException {
155 Transaction tx = (Transaction) xaTxs.get(xid);
156 if (tx == null) {
157 XAException e = new XAException("Transaction '" + xid + "' has not been started.");
158 e.errorCode = XAException.XAER_NOTA;
159 throw e;
160 }
161 return tx;
162 }
163
164 /***
165 * @see org.codehaus.activemq.service.TransactionManager#getPreparedXATransactions()
166 */
167 public ActiveMQXid[] getPreparedXATransactions() throws XAException {
168 return preparedTransactions.getXids();
169 }
170
171 /***
172 * @see org.codehaus.activemq.service.TransactionManager#setContexTransaction(org.codehaus.activemq.service.Transaction)
173 */
174 public void setContexTransaction(Transaction tx) {
175 contextTx.set(tx);
176 }
177
178 /***
179 * @see org.codehaus.activemq.service.TransactionManager#getContexTransaction()
180 */
181 public Transaction getContexTransaction() {
182 return (Transaction) contextTx.get();
183 }
184
185 /***
186 * @see org.codehaus.activemq.service.TransactionManager#cleanUpClient(org.codehaus.activemq.broker.BrokerClient)
187 */
188 public void cleanUpClient(BrokerClient client) throws JMSException {
189
190
191
192 List list = (List) activeClients.remove(client);
193 if (list != null) {
194 for (int i = 0; i < list.size(); i++) {
195 try {
196 Object o = list.get(i);
197 if (o instanceof String) {
198 Transaction t = this.getLocalTransaction((String) o);
199 t.rollback();
200 }
201 else {
202 Transaction t = this.getXATransaction((ActiveMQXid) o);
203 t.rollback();
204 }
205 }
206 catch (Exception e) {
207 log.warn("ERROR Rolling back disconnected client's transactions: ", e);
208 }
209 }
210 list.clear();
211 }
212 }
213
214 public void loadTransaction(ActiveMQXid xid, Transaction transaction) throws XAException {
215
216
217 if (transaction instanceof XATransactionCommand) {
218 XATransactionCommand xaTransaction = (XATransactionCommand) transaction;
219 xaTransaction.initialise(xaTxs, preparedTransactions);
220 }
221 transaction.setBroker(broker);
222
223 xaTxs.put(xid, transaction);
224 }
225
226 public void start() throws JMSException {
227 preparedTransactions.start();
228 try {
229 preparedTransactions.loadPreparedTransactions(this);
230 }
231 catch (XAException e) {
232 throw JMSExceptionHelper.newJMSException("Failed to recover: " + e, e);
233 }
234 }
235
236 public void stop() throws JMSException {
237 preparedTransactions.stop();
238 }
239
240
241
242
243
244
245 private void addActiveTransaction(BrokerClient client, Object transactionId) {
246 List list = (List) activeClients.get(client);
247 if (list == null) {
248 list = new ArrayList();
249 activeClients.put(client, list);
250 }
251 list.add(transactionId);
252 }
253
254 private void removeActiveTransaction(BrokerClient client, Object transactionId) {
255 List list = (List) activeClients.get(client);
256 if (list != null) {
257 list.remove(transactionId);
258 }
259 }
260
261
262 }