View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Hiram Chirino
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.store.jdbc;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.message.WireFormat;
23  import org.codehaus.activemq.service.impl.PersistenceAdapterSupport;
24  import org.codehaus.activemq.store.MessageStore;
25  import org.codehaus.activemq.store.PersistenceAdapter;
26  import org.codehaus.activemq.store.PreparedTransactionStore;
27  import org.codehaus.activemq.store.TopicMessageStore;
28  import org.codehaus.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
29  import org.codehaus.activemq.util.FactoryFinder;
30  import org.codehaus.activemq.util.JMSExceptionHelper;
31  
32  import javax.jms.JMSException;
33  import javax.sql.DataSource;
34  import java.sql.Connection;
35  import java.sql.SQLException;
36  
37  /***
38   * A {@link PersistenceAdapter} implementation using JDBC for
39   * persistence storage.
40   *
41   * @version $Revision: 1.4 $
42   */
43  public class JDBCPersistenceAdapter extends PersistenceAdapterSupport {
44  
45      private static final Log log = LogFactory.getLog(JDBCPersistenceAdapter.class);
46      private static FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/codehaus/activemq/store/jdbc/");
47  
48      private final WireFormat wireFormat;
49      private final DataSource ds;
50      private JDBCAdapter adapter;
51  
52  
53      public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
54          this.ds = ds;
55          this.wireFormat = wireFormat;
56      }
57  
58      public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
59          if (adapter == null) {
60              throw new IllegalStateException("Not started");
61          }
62          return new JDBCMessageStore(adapter, wireFormat, destinationName);
63      }
64  
65      public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
66          if (adapter == null) {
67              throw new IllegalStateException("Not started");
68          }
69          return new JDBCTopicMessageStore(adapter, wireFormat, destinationName);
70      }
71  
72      public PreparedTransactionStore createPreparedTransactionStore() throws JMSException {
73          if (adapter == null) {
74              throw new IllegalStateException("Not started");
75          }
76          return new JDBCPreparedTransactionStore(adapter, wireFormat);
77      }
78  
79      public void beginTransaction() throws JMSException {
80          try {
81              Connection c = ds.getConnection();
82              c.setAutoCommit(false);
83              TransactionContext.pushConnection(c);
84          }
85          catch (SQLException e) {
86              throw JMSExceptionHelper.newJMSException("Failed to create transaction: " + e, e);
87          }
88      }
89  
90      public void commitTransaction() throws JMSException {
91          Connection c = TransactionContext.popConnection();
92          if (c == null) {
93              log.warn("Commit while no transaction in progress");
94          }
95          else {
96              try {
97                  c.commit();
98              }
99              catch (SQLException e) {
100                 throw JMSExceptionHelper.newJMSException("Failed to commit transaction: " + c + ": " + e, e);
101             }
102             finally {
103                 try {
104                     c.close();
105                 }
106                 catch (Throwable e) {
107                 }
108             }
109         }
110     }
111 
112     public void rollbackTransaction() {
113         Connection c = TransactionContext.popConnection();
114         try {
115             c.rollback();
116         }
117         catch (SQLException e) {
118             log.warn("Cannot rollback transaction due to: " + e, e);
119         }
120         finally {
121             try {
122                 c.close();
123             }
124             catch (Throwable e) {
125             }
126         }
127     }
128 
129 
130     public void start() throws JMSException {
131         beginTransaction();
132         try {
133             Connection c = TransactionContext.getConnection();
134             
135             // Choose the right adapter depending on the
136             // databse connection.
137             adapter = null;
138             String database = null;
139             try {
140                 database = c.getMetaData().getDriverName();
141                 database = database.replaceAll(" ", "_");
142 
143                 log.debug("Database type: [" + database + "]");
144                 try {
145                     adapter = (DefaultJDBCAdapter) factoryFinder.newInstance(database);
146                 }
147                 catch (Throwable e) {
148                     log.warn("Unrecognized database type (" + database + ").  Will use default JDBC implementation", e);
149                 }
150             }
151             catch (SQLException e1) {
152             }
153             
154             // Use the default JDBC adapter if the 
155             // Database type is not recognized.
156             if (adapter == null) {
157                 adapter = new DefaultJDBCAdapter();
158             }
159 
160             try {
161                 adapter.doCreateTables(c);
162             }
163             catch (SQLException e) {
164                 log.warn("Cannot create tables due to: " + e, e);
165             }
166             adapter.initSequenceGenerator(c);
167 
168         }
169         finally {
170             commitTransaction();
171         }
172     }
173 
174     public synchronized void stop() throws JMSException {
175     }
176 
177 }