001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.store.jdbc.adapter; 019 020 import java.sql.Connection; 021 import java.sql.PreparedStatement; 022 import java.sql.ResultSet; 023 import java.sql.SQLException; 024 import java.sql.Statement; 025 026 import javax.jms.JMSException; 027 import javax.transaction.xa.XAException; 028 029 import org.activemq.message.ActiveMQXid; 030 import org.activemq.service.SubscriberEntry; 031 import org.activemq.store.TransactionStore.RecoveryListener; 032 import org.activemq.store.jdbc.JDBCAdapter; 033 import org.activemq.store.jdbc.StatementProvider; 034 import org.activemq.util.LongSequenceGenerator; 035 import org.apache.commons.logging.Log; 036 import org.apache.commons.logging.LogFactory; 037 038 /** 039 * Implements all the default JDBC operations that are used 040 * by the JDBCPersistenceAdapter. 041 * <p/> 042 * Subclassing is encouraged to override the default 043 * implementation of methods to account for differences 044 * in JDBC Driver implementations. 045 * <p/> 046 * The JDBCAdapter inserts and extracts BLOB data using the 047 * getBytes()/setBytes() operations. 048 * <p/> 049 * The databases/JDBC drivers that use this adapter are: 050 * <ul> 051 * <li></li> 052 * </ul> 053 * 054 * @version $Revision: 1.1 $ 055 */ 056 public class DefaultJDBCAdapter implements JDBCAdapter { 057 058 private static final Log log = LogFactory.getLog(DefaultJDBCAdapter.class); 059 060 final protected StatementProvider statementProvider; 061 protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); 062 063 protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException { 064 s.setBytes(index, data); 065 } 066 067 protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException { 068 return rs.getBytes(index); 069 } 070 071 /** 072 * @param provider 073 */ 074 public DefaultJDBCAdapter(StatementProvider provider) { 075 this.statementProvider = new CachingStatementProvider(provider); 076 } 077 078 public DefaultJDBCAdapter() { 079 this(new DefaultStatementProvider()); 080 } 081 082 public LongSequenceGenerator getSequenceGenerator() { 083 return sequenceGenerator; 084 } 085 086 public void doCreateTables(Connection c) throws SQLException { 087 Statement s = null; 088 try { 089 s = c.createStatement(); 090 String[] createStatments = statementProvider.getCreateSchemaStatments(); 091 for (int i = 0; i < createStatments.length; i++) { 092 // This will fail usually since the tables will be 093 // created allready. 094 try { 095 boolean rc = s.execute(createStatments[i]); 096 } 097 catch (SQLException e) { 098 log.info("Could not create JDBC tables; they could already exist." + 099 " Failure was: " + createStatments[i] + " Message: " + e.getMessage() + 100 " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode() ); 101 } 102 } 103 c.commit(); 104 } 105 finally { 106 try { 107 s.close(); 108 } 109 catch (Throwable e) { 110 } 111 } 112 } 113 114 public void doDropTables(Connection c) throws SQLException { 115 Statement s = null; 116 try { 117 s = c.createStatement(); 118 String[] dropStatments = statementProvider.getDropSchemaStatments(); 119 for (int i = 0; i < dropStatments.length; i++) { 120 // This will fail usually since the tables will be 121 // created allready. 122 try { 123 boolean rc = s.execute(dropStatments[i]); 124 } 125 catch (SQLException e) { 126 log.warn("Could not drop JDBC tables; they may not exist." + 127 " Failure was: " + dropStatments[i] + " Message: " + e.getMessage() + 128 " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode() ); 129 } 130 } 131 c.commit(); 132 } 133 finally { 134 try { 135 s.close(); 136 } 137 catch (Throwable e) { 138 } 139 } 140 } 141 142 public void initSequenceGenerator(Connection c) { 143 PreparedStatement s = null; 144 ResultSet rs = null; 145 try { 146 s = c.prepareStatement(statementProvider.getFindLastSequenceIdInMsgs()); 147 rs = s.executeQuery(); 148 long seq1 = 0; 149 if (rs.next()) { 150 seq1 = rs.getLong(1); 151 } 152 rs.close(); 153 s.close(); 154 s = c.prepareStatement(statementProvider.getFindLastSequenceIdInAcks()); 155 rs = s.executeQuery(); 156 long seq2 = 0; 157 if (rs.next()) { 158 seq2 = rs.getLong(1); 159 } 160 161 sequenceGenerator.setLastSequenceId(Math.max(seq1, seq2)); 162 log.debug("Last sequence id: "+sequenceGenerator.getLastSequenceId()); 163 } 164 catch (SQLException e) { 165 log.warn("Failed to find last sequence number: " + e, e); 166 } 167 finally { 168 try { 169 rs.close(); 170 } 171 catch (Throwable e) { 172 } 173 try { 174 s.close(); 175 } 176 catch (Throwable e) { 177 } 178 } 179 } 180 181 public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data, long expiration) throws SQLException, JMSException { 182 PreparedStatement s = null; 183 try { 184 s = c.prepareStatement(statementProvider.getAddMessageStatment()); 185 s.setLong(1, seq); 186 s.setString(2, destinationName); 187 s.setString(3, messageID); 188 setBinaryData(s, 4, data); 189 s.setLong(5, expiration); 190 if (s.executeUpdate() != 1) { 191 throw new JMSException("Failed to broker message: " + messageID + " in container. "); 192 } 193 } 194 finally { 195 try { 196 s.close(); 197 } 198 catch (Throwable e) { 199 } 200 } 201 } 202 203 public Long getMessageSequenceId(Connection c, String messageID) throws SQLException, JMSException { 204 PreparedStatement s = null; 205 ResultSet rs = null; 206 try { 207 208 s = c.prepareStatement(statementProvider.getFindMessageSequenceIdStatment()); 209 s.setString(1, messageID); 210 rs = s.executeQuery(); 211 212 if (!rs.next()) { 213 return null; 214 } 215 return new Long( rs.getLong(1) ); 216 217 } 218 finally { 219 try { 220 rs.close(); 221 } 222 catch (Throwable e) { 223 } 224 try { 225 s.close(); 226 } 227 catch (Throwable e) { 228 } 229 } 230 } 231 232 public byte[] doGetMessage(Connection c, long seq) throws SQLException { 233 PreparedStatement s = null; 234 ResultSet rs = null; 235 try { 236 237 s = c.prepareStatement(statementProvider.getFindMessageStatment()); 238 s.setLong(1, seq); 239 rs = s.executeQuery(); 240 241 if (!rs.next()) { 242 return null; 243 } 244 return getBinaryData(rs, 1); 245 246 } 247 finally { 248 try { 249 rs.close(); 250 } 251 catch (Throwable e) { 252 } 253 try { 254 s.close(); 255 } 256 catch (Throwable e) { 257 } 258 } 259 } 260 261 public void doRemoveMessage(Connection c, long seq) throws SQLException { 262 PreparedStatement s = null; 263 try { 264 s = c.prepareStatement(statementProvider.getRemoveMessageStatment()); 265 s.setLong(1, seq); 266 if (s.executeUpdate() != 1) { 267 log.error("Could not delete sequenece number for: " + seq); 268 } 269 } 270 finally { 271 try { 272 s.close(); 273 } 274 catch (Throwable e) { 275 } 276 } 277 } 278 279 public void doRecover(Connection c, String destinationName, MessageListResultHandler listener) throws SQLException, JMSException { 280 PreparedStatement s = null; 281 ResultSet rs = null; 282 try { 283 284 s = c.prepareStatement(statementProvider.getFindAllMessagesStatment()); 285 s.setString(1, destinationName); 286 rs = s.executeQuery(); 287 288 while (rs.next()) { 289 long seq = rs.getLong(1); 290 String msgid = rs.getString(2); 291 listener.onMessage(seq, msgid); 292 } 293 294 } 295 finally { 296 try { 297 rs.close(); 298 } 299 catch (Throwable e) { 300 } 301 try { 302 s.close(); 303 } 304 catch (Throwable e) { 305 } 306 } 307 } 308 309 public void doRemoveXid(Connection c, ActiveMQXid xid) throws SQLException, XAException { 310 PreparedStatement s = null; 311 try { 312 s = c.prepareStatement(statementProvider.getRemoveXidStatment()); 313 s.setString(1, xid.toLocalTransactionId()); 314 if (s.executeUpdate() != 1) { 315 throw new XAException("Failed to remove prepared transaction: " + xid + "."); 316 } 317 } 318 finally { 319 try { 320 s.close(); 321 } 322 catch (Throwable e) { 323 } 324 } 325 } 326 327 328 public void doAddXid(Connection c, ActiveMQXid xid) throws SQLException, XAException { 329 PreparedStatement s = null; 330 try { 331 332 s = c.prepareStatement(statementProvider.getAddXidStatment()); 333 s.setString(1, xid.toLocalTransactionId()); 334 if (s.executeUpdate() != 1) { 335 throw new XAException("Failed to store prepared transaction: " + xid); 336 } 337 338 } 339 finally { 340 try { 341 s.close(); 342 } 343 catch (Throwable e) { 344 } 345 } 346 } 347 348 public void doLoadPreparedTransactions(Connection c, RecoveryListener listener) throws SQLException { 349 PreparedStatement s = null; 350 ResultSet rs = null; 351 try { 352 353 s = c.prepareStatement(statementProvider.getFindAllXidStatment()); 354 rs = s.executeQuery(); 355 356 while (rs.next()) { 357 String id = rs.getString(1); 358 359 360 /* 361 byte data[] = this.getBinaryData(rs, 2); 362 try { 363 ActiveMQXid xid = new ActiveMQXid(id); 364 Transaction transaction = XATransactionCommand.fromBytes(data); 365 transactionManager.loadTransaction(xid, transaction); 366 } 367 catch (Exception e) { 368 log.error("Failed to recover prepared transaction due to invalid xid: " + id, e); 369 } 370 */ 371 } 372 } 373 finally { 374 try { 375 rs.close(); 376 } 377 catch (Throwable e) { 378 } 379 try { 380 s.close(); 381 } 382 catch (Throwable e) { 383 } 384 } 385 } 386 387 /** 388 * @throws JMSException 389 * @see org.activemq.store.jdbc.JDBCAdapter#doSetLastAck(java.sql.Connection, java.lang.String, java.lang.String, long) 390 */ 391 public void doSetLastAck(Connection c, String destinationName, String subscriptionID, long seq) throws SQLException, JMSException { 392 PreparedStatement s = null; 393 try { 394 s = c.prepareStatement(statementProvider.getUpdateLastAckOfDurableSub()); 395 s.setLong(1, seq); 396 s.setString(2, subscriptionID); 397 s.setString(3, destinationName); 398 399 if (s.executeUpdate() != 1) { 400 throw new JMSException("Failed to acknowlege message with sequence id: " + seq + " for client: " + subscriptionID); 401 } 402 } 403 finally { 404 try { 405 s.close(); 406 } 407 catch (Throwable e) { 408 } 409 } 410 } 411 412 /** 413 * @throws JMSException 414 * @see org.activemq.store.jdbc.JDBCAdapter#doRecoverSubscription(java.sql.Connection, java.lang.String, java.lang.String, org.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler) 415 */ 416 public void doRecoverSubscription(Connection c, String destinationName, String subscriptionID, MessageListResultHandler listener) throws SQLException, JMSException { 417 // dumpTables(c, destinationName, subscriptionID); 418 419 PreparedStatement s = null; 420 ResultSet rs = null; 421 try { 422 423 // System.out.println(statementProvider.getFindAllDurableSubMessagesStatment()); 424 s = c.prepareStatement(statementProvider.getFindAllDurableSubMessagesStatment()); 425 s.setString(1, destinationName); 426 s.setString(2, subscriptionID); 427 rs = s.executeQuery(); 428 429 while (rs.next()) { 430 long seq = rs.getLong(1); 431 String msgid = rs.getString(2); 432 listener.onMessage(seq, msgid); 433 } 434 435 } 436 finally { 437 try { 438 rs.close(); 439 } 440 catch (Throwable e) { 441 } 442 try { 443 s.close(); 444 } 445 catch (Throwable e) { 446 } 447 } 448 } 449 450 /** 451 * @see org.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object, org.activemq.service.SubscriberEntry) 452 */ 453 public void doSetSubscriberEntry(Connection c, String destinationName, String sub, SubscriberEntry subscriberEntry) throws SQLException { 454 455 PreparedStatement s = null; 456 try { 457 s = c.prepareStatement(statementProvider.getUpdateDurableSubStatment()); 458 s.setInt(1, subscriberEntry.getSubscriberID()); 459 s.setString(2, subscriberEntry.getClientID()); 460 s.setString(3, subscriberEntry.getConsumerName()); 461 s.setString(4, subscriberEntry.getSelector()); 462 s.setString(5, sub); 463 s.setString(6, destinationName); 464 465 // If the sub was not there then we need to create it. 466 if (s.executeUpdate() != 1) { 467 s.close(); 468 469 long id=0; 470 ResultSet rs=null; 471 s = c.prepareStatement(statementProvider.getFindLastSequenceIdInMsgs()); 472 try { 473 rs = s.executeQuery(); 474 if (rs.next()) { 475 id = rs.getLong(1); 476 } 477 } finally { 478 try { 479 rs.close(); 480 } catch (Throwable e) { 481 } 482 } 483 s.close(); 484 485 s = c.prepareStatement(statementProvider.getCreateDurableSubStatment()); 486 s.setInt(1, subscriberEntry.getSubscriberID()); 487 s.setString(2, subscriberEntry.getClientID()); 488 s.setString(3, subscriberEntry.getConsumerName()); 489 s.setString(4, subscriberEntry.getSelector()); 490 s.setString(5, sub); 491 s.setString(6, destinationName); 492 493 s.setLong(7, id); 494 495 if (s.executeUpdate() != 1) { 496 log.error("Failed to store durable subscription for: " + sub); 497 } 498 } 499 } 500 finally { 501 try { 502 s.close(); 503 } 504 catch (Throwable e) { 505 } 506 } 507 } 508 509 /** 510 * @see org.activemq.store.jdbc.JDBCAdapter#doGetSubscriberEntry(java.sql.Connection, java.lang.Object) 511 */ 512 public SubscriberEntry doGetSubscriberEntry(Connection c, String destinationName, String sub) throws SQLException { 513 PreparedStatement s = null; 514 ResultSet rs = null; 515 try { 516 517 s = c.prepareStatement(statementProvider.getFindDurableSubStatment()); 518 s.setString(1, sub); 519 s.setString(2, destinationName); 520 rs = s.executeQuery(); 521 522 if (!rs.next()) { 523 return null; 524 } 525 526 SubscriberEntry answer = new SubscriberEntry(); 527 answer.setSubscriberID(rs.getInt(1)); 528 answer.setClientID(rs.getString(2)); 529 answer.setConsumerName(rs.getString(3)); 530 answer.setDestination(rs.getString(4)); 531 532 return answer; 533 534 } 535 finally { 536 try { 537 rs.close(); 538 } 539 catch (Throwable e) { 540 } 541 try { 542 s.close(); 543 } 544 catch (Throwable e) { 545 } 546 } 547 } 548 549 public void doRemoveAllMessages(Connection c, String destinationName) throws SQLException, JMSException { 550 PreparedStatement s = null; 551 try { 552 s = c.prepareStatement(statementProvider.getRemoveAllMessagesStatment()); 553 s.setString(1, destinationName); 554 s.executeUpdate(); 555 s.close(); 556 557 s = c.prepareStatement(statementProvider.getRemoveAllSubscriptionsStatment()); 558 s.setString(1, destinationName); 559 s.executeUpdate(); 560 561 } 562 finally { 563 try { 564 s.close(); 565 } 566 catch (Throwable e) { 567 } 568 } 569 } 570 571 public void doDeleteSubscription(Connection c, String destinationName, String subscription) throws SQLException, JMSException { 572 PreparedStatement s = null; 573 try { 574 s = c.prepareStatement(statementProvider.getDeleteSubscriptionStatment()); 575 s.setString(1, subscription); 576 s.setString(2, destinationName); 577 578 s.executeUpdate(); 579 } 580 finally { 581 try { 582 s.close(); 583 } 584 catch (Throwable e) { 585 } 586 } 587 } 588 589 public void doDeleteOldMessages(Connection c) throws SQLException, JMSException { 590 PreparedStatement s = null; 591 try { 592 s = c.prepareStatement(statementProvider.getDeleteOldMessagesStatment()); 593 s.setLong(1, System.currentTimeMillis()); 594 int i = s.executeUpdate(); 595 log.debug("Deleted "+i+" old message(s)."); 596 } 597 finally { 598 try { 599 s.close(); 600 } 601 catch (Throwable e) { 602 } 603 } 604 } 605 606 /* 607 * Usefull for debuging. 608 * 609 public void dumpTables(Connection c, String destinationName, String subscriptionID) throws SQLException { 610 printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 611 printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 612 PreparedStatement s = c.prepareStatement("SELECT M.ID, M.MSGID " + 613 "FROM ACTIVEMQ_MSGS M, ACTIVEMQ_ACKS D " + 614 "WHERE D.CONTAINER=? AND D.SUB=? " + 615 "AND M.CONTAINER=D.CONTAINER " + 616 "AND M.ID > D.LAST_ACKED_ID " + 617 "ORDER BY M.ID"); 618 s.setString(1,destinationName); 619 s.setString(2,subscriptionID); 620 printQuery(s,System.out); 621 } 622 623 private void printQuery(Connection c, String query, PrintStream out) throws SQLException { 624 printQuery(c.prepareStatement(query), out); 625 } 626 627 private void printQuery(PreparedStatement s, PrintStream out) throws SQLException { 628 629 ResultSet set=null; 630 try { 631 set = s.executeQuery(); 632 ResultSetMetaData metaData = set.getMetaData(); 633 for( int i=1; i<= metaData.getColumnCount(); i++ ) { 634 if(i==1) 635 out.print("||"); 636 out.print(metaData.getColumnName(i)+"||"); 637 } 638 out.println(); 639 while(set.next()) { 640 for( int i=1; i<= metaData.getColumnCount(); i++ ) { 641 if(i==1) 642 out.print("|"); 643 out.print(set.getString(i)+"|"); 644 } 645 out.println(); 646 } 647 } finally { 648 try { set.close(); } catch (Throwable ignore) {} 649 try { s.close(); } catch (Throwable ignore) {} 650 } 651 } 652 */ 653 }