001    /**
002     * 
003     * Copyright 2004 Hiram Chirino
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.store.jdbc.adapter;
019    
020    import java.sql.Connection;
021    import java.sql.PreparedStatement;
022    import java.sql.ResultSet;
023    import java.sql.SQLException;
024    import java.sql.Statement;
025    
026    import javax.jms.JMSException;
027    import javax.transaction.xa.XAException;
028    
029    import org.activemq.message.ActiveMQXid;
030    import org.activemq.service.SubscriberEntry;
031    import org.activemq.store.TransactionStore.RecoveryListener;
032    import org.activemq.store.jdbc.JDBCAdapter;
033    import org.activemq.store.jdbc.StatementProvider;
034    import org.activemq.util.LongSequenceGenerator;
035    import org.apache.commons.logging.Log;
036    import org.apache.commons.logging.LogFactory;
037    
038    /**
039     * Implements all the default JDBC operations that are used
040     * by the JDBCPersistenceAdapter.
041     * <p/>
042     * Subclassing is encouraged to override the default
043     * implementation of methods to account for differences
044     * in JDBC Driver implementations.
045     * <p/>
046     * The JDBCAdapter inserts and extracts BLOB data using the
047     * getBytes()/setBytes() operations.
048     * <p/>
049     * The databases/JDBC drivers that use this adapter are:
050     * <ul>
051     * <li></li>
052     * </ul>
053     *
054     * @version $Revision: 1.1 $
055     */
056    public class DefaultJDBCAdapter implements JDBCAdapter {
057    
058        private static final Log log = LogFactory.getLog(DefaultJDBCAdapter.class);
059    
060        final protected StatementProvider statementProvider;
061        protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
062    
063        protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
064            s.setBytes(index, data);
065        }
066    
067        protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
068            return rs.getBytes(index);
069        }
070    
071        /**
072         * @param provider
073         */
074        public DefaultJDBCAdapter(StatementProvider provider) {
075            this.statementProvider = new CachingStatementProvider(provider);
076        }
077    
078        public DefaultJDBCAdapter() {
079            this(new DefaultStatementProvider());
080        }
081    
082        public LongSequenceGenerator getSequenceGenerator() {
083            return sequenceGenerator;
084        }
085    
086        public void doCreateTables(Connection c) throws SQLException {
087            Statement s = null;
088            try {
089                s = c.createStatement();
090                String[] createStatments = statementProvider.getCreateSchemaStatments();
091                for (int i = 0; i < createStatments.length; i++) {
092                    // This will fail usually since the tables will be
093                    // created allready.
094                    try {
095                        boolean rc = s.execute(createStatments[i]);
096                    }
097                    catch (SQLException e) {
098                        log.info("Could not create JDBC tables; they could already exist." +
099                            " Failure was: " + createStatments[i] + " Message: " + e.getMessage() +
100                            " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode() );
101                    }
102                }
103                c.commit();
104            }
105            finally {
106                try {
107                    s.close();
108                }
109                catch (Throwable e) {
110                }
111            }
112        }
113    
114        public void doDropTables(Connection c) throws SQLException {
115            Statement s = null;
116            try {
117                s = c.createStatement();
118                String[] dropStatments = statementProvider.getDropSchemaStatments();
119                for (int i = 0; i < dropStatments.length; i++) {
120                    // This will fail usually since the tables will be
121                    // created allready.
122                    try {
123                        boolean rc = s.execute(dropStatments[i]);
124                    }
125                    catch (SQLException e) {
126                        log.warn("Could not drop JDBC tables; they may not exist." +
127                            " Failure was: " + dropStatments[i] + " Message: " + e.getMessage() +
128                            " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode() );
129                    }
130                }
131                c.commit();
132            }
133            finally {
134                try {
135                    s.close();
136                }
137                catch (Throwable e) {
138                }
139            }
140        }
141    
142        public void initSequenceGenerator(Connection c) {
143            PreparedStatement s = null;
144            ResultSet rs = null;
145            try {
146                s = c.prepareStatement(statementProvider.getFindLastSequenceIdInMsgs());
147                rs = s.executeQuery();
148                long seq1 = 0;
149                if (rs.next()) {
150                    seq1 = rs.getLong(1);
151                }
152                rs.close();
153                s.close();
154                s = c.prepareStatement(statementProvider.getFindLastSequenceIdInAcks());
155                rs = s.executeQuery();
156                long seq2 = 0;
157                if (rs.next()) {
158                    seq2 = rs.getLong(1);
159                }
160                
161                sequenceGenerator.setLastSequenceId(Math.max(seq1, seq2));
162                log.debug("Last sequence id: "+sequenceGenerator.getLastSequenceId());
163            }
164            catch (SQLException e) {
165                log.warn("Failed to find last sequence number: " + e, e);
166            }
167            finally {
168                try {
169                    rs.close();
170                }
171                catch (Throwable e) {
172                }
173                try {
174                    s.close();
175                }
176                catch (Throwable e) {
177                }
178            }
179        }
180    
181        public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data, long expiration) throws SQLException, JMSException {
182            PreparedStatement s = null;
183            try {
184                s = c.prepareStatement(statementProvider.getAddMessageStatment());
185                s.setLong(1, seq);
186                s.setString(2, destinationName);
187                s.setString(3, messageID);
188                setBinaryData(s, 4, data);
189                s.setLong(5, expiration);
190                if (s.executeUpdate() != 1) {
191                    throw new JMSException("Failed to broker message: " + messageID + " in container.  ");
192                }
193            }
194            finally {
195                try {
196                    s.close();
197                }
198                catch (Throwable e) {
199                }
200            }
201        }
202    
203            public Long getMessageSequenceId(Connection c, String messageID) throws SQLException, JMSException {
204            PreparedStatement s = null;
205            ResultSet rs = null;
206            try {
207    
208                s = c.prepareStatement(statementProvider.getFindMessageSequenceIdStatment());
209                s.setString(1, messageID);
210                rs = s.executeQuery();
211    
212                if (!rs.next()) {
213                    return null;
214                }
215                return new Long( rs.getLong(1) );
216    
217            }
218            finally {
219                try {
220                    rs.close();
221                }
222                catch (Throwable e) {
223                }
224                try {
225                    s.close();
226                }
227                catch (Throwable e) {
228                }
229            }
230            }
231    
232        public byte[] doGetMessage(Connection c, long seq) throws SQLException {
233            PreparedStatement s = null;
234            ResultSet rs = null;
235            try {
236    
237                s = c.prepareStatement(statementProvider.getFindMessageStatment());
238                s.setLong(1, seq);
239                rs = s.executeQuery();
240    
241                if (!rs.next()) {
242                    return null;
243                }
244                return getBinaryData(rs, 1);
245    
246            }
247            finally {
248                try {
249                    rs.close();
250                }
251                catch (Throwable e) {
252                }
253                try {
254                    s.close();
255                }
256                catch (Throwable e) {
257                }
258            }
259        }
260    
261        public void doRemoveMessage(Connection c, long seq) throws SQLException {
262            PreparedStatement s = null;
263            try {
264                s = c.prepareStatement(statementProvider.getRemoveMessageStatment());
265                s.setLong(1, seq);
266                if (s.executeUpdate() != 1) {
267                    log.error("Could not delete sequenece number for: " + seq);
268                }
269            }
270            finally {
271                try {
272                    s.close();
273                }
274                catch (Throwable e) {
275                }
276            }
277        }
278    
279        public void doRecover(Connection c, String destinationName, MessageListResultHandler listener) throws SQLException, JMSException {
280            PreparedStatement s = null;
281            ResultSet rs = null;
282            try {
283    
284                s = c.prepareStatement(statementProvider.getFindAllMessagesStatment());
285                s.setString(1, destinationName);
286                rs = s.executeQuery();
287    
288                while (rs.next()) {
289                    long seq = rs.getLong(1);
290                    String msgid = rs.getString(2);
291                    listener.onMessage(seq, msgid);
292                }
293    
294            }
295            finally {
296                try {
297                    rs.close();
298                }
299                catch (Throwable e) {
300                }
301                try {
302                    s.close();
303                }
304                catch (Throwable e) {
305                }
306            }
307        }
308    
309        public void doRemoveXid(Connection c, ActiveMQXid xid) throws SQLException, XAException {
310            PreparedStatement s = null;
311            try {
312                s = c.prepareStatement(statementProvider.getRemoveXidStatment());
313                s.setString(1, xid.toLocalTransactionId());
314                if (s.executeUpdate() != 1) {
315                    throw new XAException("Failed to remove prepared transaction: " + xid + ".");
316                }
317            }
318            finally {
319                try {
320                    s.close();
321                }
322                catch (Throwable e) {
323                }
324            }
325        }
326    
327    
328        public void doAddXid(Connection c, ActiveMQXid xid) throws SQLException, XAException {
329            PreparedStatement s = null;
330            try {
331    
332                s = c.prepareStatement(statementProvider.getAddXidStatment());
333                s.setString(1, xid.toLocalTransactionId());
334                if (s.executeUpdate() != 1) {
335                    throw new XAException("Failed to store prepared transaction: " + xid);
336                }
337    
338            }
339            finally {
340                try {
341                    s.close();
342                }
343                catch (Throwable e) {
344                }
345            }
346        }
347    
348        public void doLoadPreparedTransactions(Connection c, RecoveryListener listener) throws SQLException {
349            PreparedStatement s = null;
350            ResultSet rs = null;
351            try {
352    
353                s = c.prepareStatement(statementProvider.getFindAllXidStatment());
354                rs = s.executeQuery();
355    
356                while (rs.next()) {
357                    String id = rs.getString(1);
358                    
359                    
360                    /*
361                    byte data[] = this.getBinaryData(rs, 2);
362                    try {
363                        ActiveMQXid xid = new ActiveMQXid(id);
364                        Transaction transaction = XATransactionCommand.fromBytes(data);
365                        transactionManager.loadTransaction(xid, transaction);
366                    }
367                    catch (Exception e) {
368                        log.error("Failed to recover prepared transaction due to invalid xid: " + id, e);
369                    }
370                    */
371                }
372            }
373            finally {
374                try {
375                    rs.close();
376                }
377                catch (Throwable e) {
378                }
379                try {
380                    s.close();
381                }
382                catch (Throwable e) {
383                }
384            }
385        }
386        
387        /**
388         * @throws JMSException
389         * @see org.activemq.store.jdbc.JDBCAdapter#doSetLastAck(java.sql.Connection, java.lang.String, java.lang.String, long)
390         */
391        public void doSetLastAck(Connection c, String destinationName, String subscriptionID, long seq) throws SQLException, JMSException {
392            PreparedStatement s = null;
393            try {
394                s = c.prepareStatement(statementProvider.getUpdateLastAckOfDurableSub());
395                s.setLong(1, seq);
396                s.setString(2, subscriptionID);
397                s.setString(3, destinationName);
398    
399                if (s.executeUpdate() != 1) {
400                    throw new JMSException("Failed to acknowlege message with sequence id: " + seq + " for client: " + subscriptionID);
401                }
402            }
403            finally {
404                try {
405                    s.close();
406                }
407                catch (Throwable e) {
408                }
409            }
410        }
411    
412        /**
413         * @throws JMSException
414         * @see org.activemq.store.jdbc.JDBCAdapter#doRecoverSubscription(java.sql.Connection, java.lang.String, java.lang.String, org.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler)
415         */
416        public void doRecoverSubscription(Connection c, String destinationName, String subscriptionID, MessageListResultHandler listener) throws SQLException, JMSException {
417    //        dumpTables(c, destinationName, subscriptionID);
418    
419            PreparedStatement s = null;
420            ResultSet rs = null;
421            try {
422    
423    //            System.out.println(statementProvider.getFindAllDurableSubMessagesStatment());
424                s = c.prepareStatement(statementProvider.getFindAllDurableSubMessagesStatment());
425                s.setString(1, destinationName);
426                s.setString(2, subscriptionID);
427                rs = s.executeQuery();
428    
429                while (rs.next()) {
430                    long seq = rs.getLong(1);
431                    String msgid = rs.getString(2);
432                    listener.onMessage(seq, msgid);
433                }
434    
435            }
436            finally {
437                try {
438                    rs.close();
439                }
440                catch (Throwable e) {
441                }
442                try {
443                    s.close();
444                }
445                catch (Throwable e) {
446                }
447            }
448        }
449    
450        /**
451         * @see org.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object, org.activemq.service.SubscriberEntry)
452         */
453        public void doSetSubscriberEntry(Connection c, String destinationName, String sub, SubscriberEntry subscriberEntry) throws SQLException {
454    
455            PreparedStatement s = null;
456            try {
457                s = c.prepareStatement(statementProvider.getUpdateDurableSubStatment());
458                s.setInt(1, subscriberEntry.getSubscriberID());
459                s.setString(2, subscriberEntry.getClientID());
460                s.setString(3, subscriberEntry.getConsumerName());
461                s.setString(4, subscriberEntry.getSelector());
462                s.setString(5, sub);
463                s.setString(6, destinationName);
464                
465                // If the sub was not there then we need to create it.
466                if (s.executeUpdate() != 1) {
467                    s.close();
468                    
469                    long id=0;
470                    ResultSet rs=null;
471                    s = c.prepareStatement(statementProvider.getFindLastSequenceIdInMsgs());
472                    try {
473                        rs = s.executeQuery();
474                        if (rs.next()) {
475                            id = rs.getLong(1);
476                        }
477                    } finally {
478                        try {
479                            rs.close();
480                        } catch (Throwable e) {
481                        }
482                    }
483                    s.close();
484                    
485                    s = c.prepareStatement(statementProvider.getCreateDurableSubStatment());
486                    s.setInt(1, subscriberEntry.getSubscriberID());
487                    s.setString(2, subscriberEntry.getClientID());
488                    s.setString(3, subscriberEntry.getConsumerName());
489                    s.setString(4, subscriberEntry.getSelector());
490                    s.setString(5, sub);
491                    s.setString(6, destinationName);
492                    
493                    s.setLong(7, id);
494    
495                    if (s.executeUpdate() != 1) {
496                        log.error("Failed to store durable subscription for: " + sub);
497                    }
498                }
499            }
500            finally {
501                try {
502                    s.close();
503                }
504                catch (Throwable e) {
505                }
506            }
507        }
508    
509        /**
510         * @see org.activemq.store.jdbc.JDBCAdapter#doGetSubscriberEntry(java.sql.Connection, java.lang.Object)
511         */
512        public SubscriberEntry doGetSubscriberEntry(Connection c, String destinationName, String sub) throws SQLException {
513            PreparedStatement s = null;
514            ResultSet rs = null;
515            try {
516    
517                s = c.prepareStatement(statementProvider.getFindDurableSubStatment());
518                s.setString(1, sub);
519                s.setString(2, destinationName);
520                rs = s.executeQuery();
521    
522                if (!rs.next()) {
523                    return null;
524                }
525    
526                SubscriberEntry answer = new SubscriberEntry();
527                answer.setSubscriberID(rs.getInt(1));
528                answer.setClientID(rs.getString(2));
529                answer.setConsumerName(rs.getString(3));
530                answer.setDestination(rs.getString(4));
531    
532                return answer;
533    
534            }
535            finally {
536                try {
537                    rs.close();
538                }
539                catch (Throwable e) {
540                }
541                try {
542                    s.close();
543                }
544                catch (Throwable e) {
545                }
546            }
547        }
548    
549        public void doRemoveAllMessages(Connection c, String destinationName) throws SQLException, JMSException {
550            PreparedStatement s = null;
551            try {
552                s = c.prepareStatement(statementProvider.getRemoveAllMessagesStatment());
553                s.setString(1, destinationName);
554                s.executeUpdate();
555                s.close();
556                
557                s = c.prepareStatement(statementProvider.getRemoveAllSubscriptionsStatment());
558                s.setString(1, destinationName);
559                s.executeUpdate();
560                
561            }
562            finally {
563                try {
564                    s.close();
565                }
566                catch (Throwable e) {
567                }
568            }
569        }
570    
571        public void doDeleteSubscription(Connection c, String destinationName, String subscription) throws SQLException, JMSException {
572            PreparedStatement s = null;
573            try {
574                s = c.prepareStatement(statementProvider.getDeleteSubscriptionStatment());
575                s.setString(1, subscription);
576                s.setString(2, destinationName);
577    
578                s.executeUpdate();
579            }
580            finally {
581                try {
582                    s.close();
583                }
584                catch (Throwable e) {
585                }
586            }
587        }
588    
589        public void doDeleteOldMessages(Connection c) throws SQLException, JMSException {
590            PreparedStatement s = null;
591            try {
592                s = c.prepareStatement(statementProvider.getDeleteOldMessagesStatment());
593                s.setLong(1, System.currentTimeMillis());
594                int i = s.executeUpdate();
595                log.debug("Deleted "+i+" old message(s).");
596            }
597            finally {
598                try {
599                    s.close();
600                }
601                catch (Throwable e) {
602                }
603            }
604        }
605    
606        /*
607         * Usefull for debuging.
608         *
609        public void dumpTables(Connection c, String destinationName, String subscriptionID) throws SQLException {        
610            printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
611            printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
612            PreparedStatement s = c.prepareStatement("SELECT M.ID, M.MSGID " +
613                    "FROM ACTIVEMQ_MSGS M, ACTIVEMQ_ACKS D  " +
614                    "WHERE D.CONTAINER=? AND D.SUB=?  " +
615                    "AND M.CONTAINER=D.CONTAINER " +
616                    "AND M.ID > D.LAST_ACKED_ID " +
617                    "ORDER BY M.ID");
618            s.setString(1,destinationName);
619            s.setString(2,subscriptionID);        
620            printQuery(s,System.out);
621        }
622    
623        private void printQuery(Connection c, String query, PrintStream out) throws SQLException {
624            printQuery(c.prepareStatement(query), out);
625        }
626        
627        private void printQuery(PreparedStatement s, PrintStream out) throws SQLException {
628            
629            ResultSet set=null;
630            try {
631                set = s.executeQuery();
632                ResultSetMetaData metaData = set.getMetaData();
633                for( int i=1; i<= metaData.getColumnCount(); i++ ) {
634                    if(i==1)
635                        out.print("||");
636                    out.print(metaData.getColumnName(i)+"||");
637                }
638                out.println();
639                while(set.next()) {
640                    for( int i=1; i<= metaData.getColumnCount(); i++ ) {
641                        if(i==1)
642                            out.print("|");
643                        out.print(set.getString(i)+"|");
644                    }
645                    out.println();
646                }
647            } finally {
648                try { set.close(); } catch (Throwable ignore) {}
649                try { s.close(); } catch (Throwable ignore) {}
650            }
651        }
652        */
653    }