1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.store.jdbc.adapter;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.message.ActiveMQXid;
23 import org.codehaus.activemq.service.SubscriberEntry;
24 import org.codehaus.activemq.service.Transaction;
25 import org.codehaus.activemq.service.TransactionManager;
26 import org.codehaus.activemq.service.impl.XATransactionCommand;
27 import org.codehaus.activemq.store.jdbc.JDBCAdapter;
28 import org.codehaus.activemq.store.jdbc.SequenceGenerator;
29 import org.codehaus.activemq.store.jdbc.StatementProvider;
30
31 import javax.jms.JMSException;
32 import javax.transaction.xa.XAException;
33 import java.sql.Connection;
34 import java.sql.PreparedStatement;
35 import java.sql.ResultSet;
36 import java.sql.SQLException;
37 import java.sql.Statement;
38 import java.util.List;
39
40 /***
41 * Implements all the default JDBC operations that are used
42 * by the JDBCPersistenceAdapter.
43 * <p/>
44 * Subclassing is encouraged to override the default
45 * implementation of methods to account for differences
46 * in JDBC Driver implementations.
47 * <p/>
48 * The JDBCAdapter inserts and extracts BLOB data using the
49 * getBytes()/setBytes() operations.
50 * <p/>
51 * The databases/JDBC drivers that use this adapter are:
52 * <ul>
53 * <li></li>
54 * </ul>
55 *
56 * @version $Revision: 1.3 $
57 */
58 public class DefaultJDBCAdapter implements JDBCAdapter {
59
60 private static final Log log = LogFactory.getLog(DefaultJDBCAdapter.class);
61
62 final protected CachingStatementProvider statementProvider;
63 protected SequenceGenerator sequenceGenerator = new SequenceGenerator();
64
65 protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
66 s.setBytes(index, data);
67 }
68
69 protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
70 return rs.getBytes(index);
71 }
72
73 /***
74 * @param provider
75 */
76 public DefaultJDBCAdapter(StatementProvider provider) {
77 this.statementProvider = new CachingStatementProvider(provider);
78 }
79
80 public DefaultJDBCAdapter() {
81 this(new DefaultStatementProvider());
82 }
83
84 public SequenceGenerator getSequenceGenerator() {
85 return sequenceGenerator;
86 }
87
88 public void doCreateTables(Connection c) throws SQLException {
89 Statement s = null;
90 try {
91 s = c.createStatement();
92 String[] createStatments = statementProvider.getCreateSchemaStatments();
93 for (int i = 0; i < createStatments.length; i++) {
94
95
96 try {
97 boolean rc = s.execute(createStatments[i]);
98 }
99 catch (SQLException e) {
100 log.debug("Statment failed: " + createStatments[i], e);
101 }
102 }
103 }
104 finally {
105 try {
106 s.close();
107 }
108 catch (Throwable e) {
109 }
110 }
111 }
112
113 public void initSequenceGenerator(Connection c) {
114 PreparedStatement s = null;
115 ResultSet rs = null;
116 try {
117 s = c.prepareStatement(statementProvider.getFindLastSequenceId());
118 rs = s.executeQuery();
119 if (rs.next()) {
120 sequenceGenerator.setLastSequenceId(rs.getLong(1));
121 }
122 }
123 catch (SQLException e) {
124 log.warn("Failed to find last sequence number: " + e, e);
125 }
126 finally {
127 try {
128 rs.close();
129 }
130 catch (Throwable e) {
131 }
132 try {
133 s.close();
134 }
135 catch (Throwable e) {
136 }
137 }
138 }
139
140 public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data) throws SQLException, JMSException {
141 PreparedStatement s = null;
142 try {
143 s = c.prepareStatement(statementProvider.getAddMessageStatment());
144 s.setLong(1, seq);
145 s.setString(2, destinationName);
146 s.setString(3, messageID);
147 setBinaryData(s, 4, data);
148 if (s.executeUpdate() != 1) {
149 throw new JMSException("Failed to broker message: " + messageID + " in container. ");
150 }
151 }
152 finally {
153 try {
154 s.close();
155 }
156 catch (Throwable e) {
157 }
158 }
159 }
160
161 public byte[] doGetMessage(Connection c, long seq) throws SQLException {
162 PreparedStatement s = null;
163 ResultSet rs = null;
164 try {
165
166 s = c.prepareStatement(statementProvider.getFindMessageStatment());
167 s.setLong(1, seq);
168 rs = s.executeQuery();
169
170 if (!rs.next()) {
171 return null;
172 }
173 return getBinaryData(rs, 1);
174
175 }
176 finally {
177 try {
178 rs.close();
179 }
180 catch (Throwable e) {
181 }
182 try {
183 s.close();
184 }
185 catch (Throwable e) {
186 }
187 }
188 }
189
190 public void doRemoveMessage(Connection c, long seq) throws SQLException {
191 PreparedStatement s = null;
192 try {
193 s = c.prepareStatement(statementProvider.getRemoveMessageStatment());
194 s.setLong(1, seq);
195 if (s.executeUpdate() != 1) {
196 log.error("Could not delete sequenece number for: " + seq);
197 }
198 }
199 finally {
200 try {
201 s.close();
202 }
203 catch (Throwable e) {
204 }
205 }
206 }
207
208 public void doRecover(Connection c, String destinationName, MessageListResultHandler listener) throws SQLException, JMSException {
209 PreparedStatement s = null;
210 ResultSet rs = null;
211 try {
212
213 s = c.prepareStatement(statementProvider.getFindAllMessagesStatment());
214 s.setString(1, destinationName);
215 rs = s.executeQuery();
216
217 while (rs.next()) {
218 long seq = rs.getLong(1);
219 String msgid = rs.getString(2);
220 listener.onMessage(seq, msgid);
221 }
222
223 }
224 finally {
225 try {
226 rs.close();
227 }
228 catch (Throwable e) {
229 }
230 try {
231 s.close();
232 }
233 catch (Throwable e) {
234 }
235 }
236 }
237
238 public void doGetXids(Connection c, List list) throws SQLException {
239 PreparedStatement s = null;
240 ResultSet rs = null;
241 try {
242 s = c.prepareStatement(statementProvider.getFindAllXidStatment());
243 rs = s.executeQuery();
244
245 while (rs.next()) {
246 String xid = rs.getString(1);
247 try {
248 list.add(new ActiveMQXid(xid));
249 }
250 catch (JMSException e) {
251 log.error("Failed to recover prepared transaction due to invalid xid: " + xid, e);
252 }
253 }
254
255 }
256 finally {
257 try {
258 rs.close();
259 }
260 catch (Throwable e) {
261 }
262 try {
263 s.close();
264 }
265 catch (Throwable e) {
266 }
267 }
268 }
269
270 public void doRemoveXid(Connection c, ActiveMQXid xid) throws SQLException, XAException {
271 PreparedStatement s = null;
272 try {
273 s = c.prepareStatement(statementProvider.getRemoveMessageStatment());
274 s.setString(1, xid.toLocalTransactionId());
275 if (s.executeUpdate() != 1) {
276 throw new XAException("Failed to remove prepared transaction: " + xid + ".");
277 }
278 }
279 finally {
280 try {
281 s.close();
282 }
283 catch (Throwable e) {
284 }
285 }
286 }
287
288
289 public void doAddXid(Connection c, ActiveMQXid xid, byte[] data) throws SQLException, XAException {
290 PreparedStatement s = null;
291 try {
292
293 s = c.prepareStatement(statementProvider.getAddMessageStatment());
294 s.setString(1, xid.toLocalTransactionId());
295 setBinaryData(s, 2, data);
296 if (s.executeUpdate() != 1) {
297 throw new XAException("Failed to store prepared transaction: " + xid);
298 }
299
300 }
301 finally {
302 try {
303 s.close();
304 }
305 catch (Throwable e) {
306 }
307 }
308 }
309
310 public void doLoadPreparedTransactions(Connection c, TransactionManager transactionManager) throws SQLException {
311 PreparedStatement s = null;
312 ResultSet rs = null;
313 try {
314
315 s = c.prepareStatement(statementProvider.getFindAllTxStatment());
316 rs = s.executeQuery();
317
318 while (rs.next()) {
319 String id = rs.getString(1);
320 byte data[] = this.getBinaryData(rs, 2);
321 try {
322 ActiveMQXid xid = new ActiveMQXid(id);
323 Transaction transaction = XATransactionCommand.fromBytes(data);
324 transactionManager.loadTransaction(xid, transaction);
325 }
326 catch (Exception e) {
327 log.error("Failed to recover prepared transaction due to invalid xid: " + id, e);
328 }
329 }
330 }
331 finally {
332 try {
333 rs.close();
334 }
335 catch (Throwable e) {
336 }
337 try {
338 s.close();
339 }
340 catch (Throwable e) {
341 }
342 }
343 }
344
345 /***
346 * @throws JMSException
347 * @see org.codehaus.activemq.store.jdbc.JDBCAdapter#doSetLastAck(java.sql.Connection, java.lang.String, java.lang.String, long)
348 */
349 public void doSetLastAck(Connection c, String destinationName, String subscriptionID, long seq) throws SQLException, JMSException {
350 PreparedStatement s = null;
351 try {
352 s = c.prepareStatement(statementProvider.getUpdateLastAckOfDurableSub());
353 s.setLong(1, seq);
354 s.setString(2, subscriptionID);
355 s.setString(3, destinationName);
356
357 if (s.executeUpdate() != 1) {
358 throw new JMSException("Failed to acknowlege message with sequence id: " + seq + " for client: " + subscriptionID);
359 }
360 }
361 finally {
362 try {
363 s.close();
364 }
365 catch (Throwable e) {
366 }
367 }
368 }
369
370 /***
371 * @throws JMSException
372 * @see org.codehaus.activemq.store.jdbc.JDBCAdapter#doRecoverSubscription(java.sql.Connection, java.lang.String, java.lang.String, org.codehaus.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler)
373 */
374 public void doRecoverSubscription(Connection c, String destinationName, String subscriptionID, MessageListResultHandler listener) throws SQLException, JMSException {
375 PreparedStatement s = null;
376 ResultSet rs = null;
377 try {
378
379 s = c.prepareStatement(statementProvider.getFindAllDurableSubMessagesStatment());
380 s.setString(1, destinationName);
381 s.setString(2, subscriptionID);
382 rs = s.executeQuery();
383
384 while (rs.next()) {
385 long seq = rs.getLong(1);
386 String msgid = rs.getString(2);
387 listener.onMessage(seq, msgid);
388 }
389
390 }
391 finally {
392 try {
393 rs.close();
394 }
395 catch (Throwable e) {
396 }
397 try {
398 s.close();
399 }
400 catch (Throwable e) {
401 }
402 }
403 }
404
405 /***
406 * @see org.codehaus.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object, org.codehaus.activemq.service.SubscriberEntry)
407 */
408 public void doSetSubscriberEntry(Connection c, String destinationName, String sub, SubscriberEntry subscriberEntry) throws SQLException {
409 PreparedStatement s = null;
410 try {
411 s = c.prepareStatement(statementProvider.getUpdateDurableSubStatment());
412 s.setInt(1, subscriberEntry.getSubscriberID());
413 s.setString(2, subscriberEntry.getClientID());
414 s.setString(3, subscriberEntry.getConsumerName());
415 s.setString(4, subscriberEntry.getSelector());
416 s.setString(5, sub);
417 s.setString(6, destinationName);
418
419
420 if (s.executeUpdate() != 1) {
421 s.close();
422 s = c.prepareStatement(statementProvider.getCreateDurableSubStatment());
423 s.setInt(1, subscriberEntry.getSubscriberID());
424 s.setString(2, subscriberEntry.getClientID());
425 s.setString(3, subscriberEntry.getConsumerName());
426 s.setString(4, subscriberEntry.getSelector());
427 s.setString(5, sub);
428 s.setString(6, destinationName);
429
430 if (s.executeUpdate() != 1) {
431 log.error("Failed to store durable subscription for: " + sub);
432 }
433 }
434 }
435 finally {
436 try {
437 s.close();
438 }
439 catch (Throwable e) {
440 }
441 }
442 }
443
444 /***
445 * @see org.codehaus.activemq.store.jdbc.JDBCAdapter#doGetSubscriberEntry(java.sql.Connection, java.lang.Object)
446 */
447 public SubscriberEntry doGetSubscriberEntry(Connection c, String destinationName, String sub) throws SQLException {
448 PreparedStatement s = null;
449 ResultSet rs = null;
450 try {
451
452 s = c.prepareStatement(statementProvider.getFindDurableSubStatment());
453 s.setString(1, sub);
454 s.setString(2, destinationName);
455 rs = s.executeQuery();
456
457 if (!rs.next()) {
458 return null;
459 }
460
461 SubscriberEntry answer = new SubscriberEntry();
462 answer.setSubscriberID(rs.getInt(1));
463 answer.setClientID(rs.getString(2));
464 answer.setConsumerName(rs.getString(3));
465 answer.setDestination(rs.getString(4));
466
467 return answer;
468
469 }
470 finally {
471 try {
472 rs.close();
473 }
474 catch (Throwable e) {
475 }
476 try {
477 s.close();
478 }
479 catch (Throwable e) {
480 }
481 }
482 }
483 }