001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.store.jdbc; 019 020 import java.sql.Connection; 021 import java.sql.SQLException; 022 import java.util.Map; 023 024 import javax.jms.JMSException; 025 import javax.sql.DataSource; 026 027 import org.apache.commons.logging.Log; 028 import org.apache.commons.logging.LogFactory; 029 import org.activemq.io.WireFormat; 030 import org.activemq.io.impl.StatelessDefaultWireFormat; 031 import org.activemq.store.MessageStore; 032 import org.activemq.store.PersistenceAdapter; 033 import org.activemq.store.TopicMessageStore; 034 import org.activemq.store.TransactionStore; 035 import org.activemq.store.jdbc.adapter.DefaultJDBCAdapter; 036 import org.activemq.store.vm.VMTransactionStore; 037 import org.activemq.util.FactoryFinder; 038 import org.activemq.util.JMSExceptionHelper; 039 040 import EDU.oswego.cs.dl.util.concurrent.ClockDaemon; 041 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory; 042 043 /** 044 * A {@link PersistenceAdapter} implementation using JDBC for 045 * persistence storage. 046 * 047 * This persistence adapter will correctly remember prepared XA transactions, 048 * but it will not keep track of local transaction commits so that operations 049 * performed against the Message store are done as a single uow. 050 * 051 * @version $Revision: 1.1 $ 052 */ 053 public class JDBCPersistenceAdapter implements PersistenceAdapter { 054 055 private static final Log log = LogFactory.getLog(JDBCPersistenceAdapter.class); 056 private static FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/activemq/store/jdbc/"); 057 058 private WireFormat wireFormat = new StatelessDefaultWireFormat(); 059 private DataSource dataSource; 060 private JDBCAdapter adapter; 061 private String adapterClass; 062 private VMTransactionStore transactionStore; 063 private boolean dropTablesOnStartup=false; 064 private ClockDaemon clockDaemon; 065 private Object clockTicket; 066 private int cleanupPeriod = 1000 * 60 * 5; 067 068 public JDBCPersistenceAdapter() { 069 } 070 071 public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) { 072 this.dataSource = ds; 073 this.wireFormat = wireFormat; 074 } 075 076 public Map getInitialDestinations() { 077 return null; 078 } 079 080 public MessageStore createQueueMessageStore(String destinationName) throws JMSException { 081 if (adapter == null) { 082 throw new IllegalStateException("Not started"); 083 } 084 MessageStore store = new JDBCMessageStore(this, adapter, wireFormat.copy(), destinationName); 085 if( transactionStore!=null ) { 086 store = transactionStore.proxy(store); 087 } 088 return store; 089 } 090 091 public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException { 092 if (adapter == null) { 093 throw new IllegalStateException("Not started"); 094 } 095 TopicMessageStore store = new JDBCTopicMessageStore(this, adapter, wireFormat.copy(), destinationName); 096 if( transactionStore!=null ) { 097 store = transactionStore.proxy(store); 098 } 099 return store; 100 } 101 102 public TransactionStore createTransactionStore() throws JMSException { 103 if (adapter == null) { 104 throw new IllegalStateException("Not started"); 105 } 106 if( this.transactionStore == null ) { 107 this.transactionStore = new VMTransactionStore(); 108 } 109 return this.transactionStore; 110 } 111 112 public void beginTransaction() throws JMSException { 113 try { 114 Connection c = dataSource.getConnection(); 115 c.setAutoCommit(false); 116 TransactionContext.pushConnection(c); 117 } 118 catch (SQLException e) { 119 throw JMSExceptionHelper.newJMSException("Failed to create transaction: " + e, e); 120 } 121 } 122 123 public void commitTransaction() throws JMSException { 124 Connection c = TransactionContext.popConnection(); 125 if (c == null) { 126 log.warn("Commit while no transaction in progress"); 127 } 128 else { 129 try { 130 c.commit(); 131 } 132 catch (SQLException e) { 133 throw JMSExceptionHelper.newJMSException("Failed to commit transaction: " + c + ": " + e, e); 134 } 135 finally { 136 try { 137 c.close(); 138 } 139 catch (Throwable e) { 140 } 141 } 142 } 143 } 144 145 public void rollbackTransaction() { 146 Connection c = TransactionContext.popConnection(); 147 try { 148 c.rollback(); 149 } 150 catch (SQLException e) { 151 log.warn("Cannot rollback transaction due to: " + e, e); 152 } 153 finally { 154 try { 155 c.close(); 156 } 157 catch (Throwable e) { 158 } 159 } 160 } 161 162 163 public void start() throws JMSException { 164 beginTransaction(); 165 Connection c = null; 166 try { 167 // Load the right adapter for the database 168 adapter = null; 169 170 try { 171 c = getConnection(); 172 } 173 catch (SQLException e) { 174 throw JMSExceptionHelper.newJMSException("Could not get a database connection: "+e,e); 175 } 176 177 // If the adapter class is not specified.. try to dectect they right type by getting 178 // info from the database. 179 if( adapterClass == null ) { 180 181 try { 182 183 // Make the filename file system safe. 184 String dirverName = c.getMetaData().getDriverName(); 185 dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase(); 186 187 try { 188 adapter = (DefaultJDBCAdapter) factoryFinder.newInstance(dirverName); 189 log.info("Database driver recognized: [" + dirverName + "]"); 190 } 191 catch (Throwable e) { 192 log.warn("Database driver NOT recognized: [" + dirverName + "]. Will use default JDBC implementation."); 193 } 194 195 } 196 catch (SQLException e) { 197 log.warn("JDBC error occured while trying to detect database type. Will use default JDBC implementation: "+e.getMessage()); 198 log.debug("Reason: " + e, e); 199 } 200 201 } else { 202 try { 203 Class clazz = JDBCPersistenceAdapter.class.getClassLoader().loadClass(adapterClass); 204 adapter = (DefaultJDBCAdapter)clazz.newInstance(); 205 } 206 catch (Throwable e) { 207 log.warn("Invalid JDBC adapter class class (" + adapterClass + "). Will use default JDBC implementation."); 208 log.debug("Reason: " + e, e); 209 } 210 } 211 212 // Use the default JDBC adapter if the 213 // Database type is not recognized. 214 if (adapter == null) { 215 adapter = new DefaultJDBCAdapter(); 216 } 217 218 if( dropTablesOnStartup ) { 219 try { 220 adapter.doDropTables(c); 221 } 222 catch (SQLException e) { 223 log.warn("Cannot drop tables due to: " + e, e); 224 } 225 } 226 try { 227 adapter.doCreateTables(c); 228 } 229 catch (SQLException e) { 230 log.warn("Cannot create tables due to: " + e, e); 231 } 232 adapter.initSequenceGenerator(c); 233 234 } 235 finally { 236 commitTransaction(); 237 } 238 239 // Cleanup the db periodically. 240 if( cleanupPeriod > 0 ) { 241 clockTicket = getClockDaemon().executePeriodically(cleanupPeriod, new Runnable() { 242 public void run() { 243 cleanup(); 244 } 245 }, false); 246 } 247 cleanup(); 248 } 249 250 public void cleanup() { 251 Connection c = null; 252 try { 253 log.debug("Cleaning up old messages."); 254 c = getConnection(); 255 adapter.doDeleteOldMessages(c); 256 } catch (JMSException e) { 257 log.warn("Old message cleanup failed due to: " + e, e); 258 } catch (SQLException e) { 259 log.warn("Old message cleanup failed due to: " + e, e); 260 } finally { 261 returnConnection(c); 262 log.debug("Cleanup done."); 263 } 264 } 265 266 public void setClockDaemon(ClockDaemon clockDaemon) { 267 this.clockDaemon = clockDaemon; 268 } 269 270 public ClockDaemon getClockDaemon() { 271 if (clockDaemon == null) { 272 clockDaemon = new ClockDaemon(); 273 clockDaemon.setThreadFactory(new ThreadFactory() { 274 public Thread newThread(Runnable runnable) { 275 Thread thread = new Thread(runnable, "Cleanup Timmer"); 276 thread.setDaemon(true); 277 return thread; 278 } 279 }); 280 } 281 return clockDaemon; 282 } 283 284 public synchronized void stop() throws JMSException { 285 if (clockTicket != null) { 286 // Stop the periodical cleanup. 287 ClockDaemon.cancel(clockTicket); 288 clockTicket=null; 289 clockDaemon.shutDown(); 290 } 291 } 292 293 public DataSource getDataSource() { 294 return dataSource; 295 } 296 297 public void setDataSource(DataSource dataSource) { 298 this.dataSource = dataSource; 299 } 300 301 public WireFormat getWireFormat() { 302 return wireFormat; 303 } 304 305 public void setWireFormat(WireFormat wireFormat) { 306 this.wireFormat = wireFormat; 307 } 308 309 public Connection getConnection() throws SQLException { 310 Connection answer = TransactionContext.peekConnection(); 311 if (answer == null) { 312 answer = dataSource.getConnection(); 313 answer.setAutoCommit(true); 314 } 315 return answer; 316 } 317 318 public void returnConnection(Connection connection) { 319 if (connection == null) { 320 return; 321 } 322 Connection peek = TransactionContext.peekConnection(); 323 if (peek != connection) { 324 try { 325 connection.close(); 326 } 327 catch (SQLException e) { 328 } 329 } 330 } 331 332 /** 333 * @return Returns the adapterClass. 334 */ 335 public String getAdapterClass() { 336 return adapterClass; 337 } 338 339 /** 340 * @param adapterClass The adapterClass to set. 341 */ 342 public void setAdapterClass(String adapterClass) { 343 this.adapterClass = adapterClass; 344 } 345 /** 346 * @return Returns the dropTablesOnStartup. 347 */ 348 public boolean getDropTablesOnStartup() { 349 return dropTablesOnStartup; 350 } 351 /** 352 * @param dropTablesOnStartup The dropTablesOnStartup to set. 353 */ 354 public void setDropTablesOnStartup(boolean dropTablesOnStartup) { 355 this.dropTablesOnStartup = dropTablesOnStartup; 356 } 357 358 public int getCleanupPeriod() { 359 return cleanupPeriod; 360 } 361 362 public void setCleanupPeriod(int cleanupPeriod) { 363 this.cleanupPeriod = cleanupPeriod; 364 } 365 }