View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.store.bdb;
19  
20  import com.sleepycat.je.Database;
21  import com.sleepycat.je.DatabaseConfig;
22  import com.sleepycat.je.DatabaseException;
23  import com.sleepycat.je.Environment;
24  import com.sleepycat.je.SecondaryConfig;
25  import com.sleepycat.je.SecondaryDatabase;
26  import com.sleepycat.je.SecondaryKeyCreator;
27  import com.sleepycat.je.Transaction;
28  import com.sleepycat.je.TransactionConfig;
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.codehaus.activemq.message.DefaultWireFormat;
32  import org.codehaus.activemq.message.WireFormat;
33  import org.codehaus.activemq.service.impl.PersistenceAdapterSupport;
34  import org.codehaus.activemq.store.MessageStore;
35  import org.codehaus.activemq.store.PersistenceAdapter;
36  import org.codehaus.activemq.store.PreparedTransactionStore;
37  import org.codehaus.activemq.store.TopicMessageStore;
38  import org.codehaus.activemq.util.JMSExceptionHelper;
39  
40  import javax.jms.JMSException;
41  import java.io.File;
42  
43  /***
44   * A {@link PersistenceAdapter} implementation using
45   * <a href="http://www.sleepycat.com">Berkeley DB Java Edition</a>
46   *
47   * @version $Revision: 1.5 $
48   */
49  public class BDbPersistenceAdapter extends PersistenceAdapterSupport {
50      private static final Log log = LogFactory.getLog(BDbPersistenceAdapter.class);
51  
52      private Environment environment;
53      private WireFormat wireFormat;
54      private DatabaseConfig config;
55      private TransactionConfig transactionConfig;
56      private File directory = new File("ActiveMQ");
57  
58  
59      /***
60       * Factory method to create an instance using the defaults
61       *
62       * @param directory the directory in which to store the persistent files
63       * @return
64       * @throws JMSException
65       */
66      public static BDbPersistenceAdapter newInstance(File directory) throws JMSException {
67          return new BDbPersistenceAdapter(directory);
68      }
69  
70  
71      public BDbPersistenceAdapter() {
72          this(null, new DefaultWireFormat());
73      }
74  
75      public BDbPersistenceAdapter(File directory) {
76          this();
77          this.directory = directory;
78      }
79  
80      public BDbPersistenceAdapter(Environment environment, WireFormat wireFormat) {
81          this(environment, wireFormat, BDbHelper.createDatabaseConfig(), new TransactionConfig());
82      }
83  
84      public BDbPersistenceAdapter(Environment environment, WireFormat wireFormat, DatabaseConfig config, TransactionConfig transactionConfig) {
85          this.environment = environment;
86          this.wireFormat = wireFormat;
87          this.config = config;
88          this.transactionConfig = transactionConfig;
89      }
90  
91      public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
92          try {
93              Database database = createDatabase("Queue_" + destinationName);
94              SequenceNumberCreator sequenceNumberCreator = new SequenceNumberCreator();
95              SecondaryConfig secondaryConfig = createSecondaryConfig(sequenceNumberCreator);
96              SecondaryDatabase secondaryDatabase = createSecondaryDatabase("Queue_Index_" + destinationName, database, secondaryConfig);
97              sequenceNumberCreator.initialise(secondaryDatabase);
98              return new BDbMessageStore(database, secondaryDatabase, secondaryConfig, sequenceNumberCreator, wireFormat.copy());
99          }
100         catch (DatabaseException e) {
101             throw JMSExceptionHelper.newJMSException("Could not create Queue MessageContainer for destination: "
102                     + destinationName + ". Reason: " + e, e);
103         }
104     }
105 
106     public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
107         try {
108             Database database = createDatabase("Topic_" + destinationName);
109             SequenceNumberCreator sequenceNumberCreator = new SequenceNumberCreator();
110             SecondaryConfig secondaryConfig = createSecondaryConfig(sequenceNumberCreator);
111             SecondaryDatabase secondaryDatabase = createSecondaryDatabase("Topic_Index_" + destinationName, database, secondaryConfig);
112             sequenceNumberCreator.initialise(secondaryDatabase);
113             Database subscriptionDatabase = createDatabase("ConsumeAck_" + destinationName);
114             return new BDbTopicMessageStore(database, secondaryDatabase, secondaryConfig, sequenceNumberCreator, wireFormat.copy(), subscriptionDatabase);
115         }
116         catch (DatabaseException e) {
117             throw JMSExceptionHelper.newJMSException("Could not create Topic MessageContainer for destination: "
118                     + destinationName + ". Reason: " + e, e);
119         }
120     }
121 
122     public PreparedTransactionStore createPreparedTransactionStore() throws JMSException {
123         try {
124             return new BDbPreparedTransactionStore(createDatabase("XaPrepareTxnDb"));
125         }
126         catch (DatabaseException e) {
127             throw JMSExceptionHelper.newJMSException("Could not create XA Prepare Transaction Database. Reason: " + e, e);
128         }
129     }
130 
131     public void beginTransaction() throws JMSException {
132         try {
133             // TODO temporary hack until BDB supports nested transactions
134             if (BDbHelper.getTransactionCount() == 0) {
135                 Transaction transaction = environment.beginTransaction(BDbHelper.getTransaction(), transactionConfig);
136                 BDbHelper.pushTransaction(transaction);
137             }
138             else {
139                 Transaction transaction = BDbHelper.getTransaction();
140                 BDbHelper.pushTransaction(transaction);
141             }
142         }
143         catch (DatabaseException e) {
144             throw JMSExceptionHelper.newJMSException("Failed to begin transaction: " + e, e);
145         }
146     }
147 
148     public void commitTransaction() throws JMSException {
149         // TODO temporary hack until BDB supports nested transactions
150         if (BDbHelper.getTransactionCount() == 1) {
151             Transaction transaction = BDbHelper.getTransaction();
152             if (transaction == null) {
153                 log.warn("Attempt to commit transaction when non in progress");
154             }
155             else {
156                 try {
157                     transaction.commit();
158                 }
159                 catch (DatabaseException e) {
160                     throw JMSExceptionHelper.newJMSException("Failed to commit transaction: " + transaction + ": " + e, e);
161                 }
162                 finally {
163                     BDbHelper.popTransaction();
164                 }
165             }
166         }
167         else {
168             BDbHelper.popTransaction();
169         }
170     }
171 
172     public void rollbackTransaction() {
173         Transaction transaction = BDbHelper.getTransaction();
174         if (transaction != null) {
175             if (BDbHelper.getTransactionCount() == 1) {
176                 try {
177                     transaction.abort();
178                 }
179                 catch (DatabaseException e) {
180                     log.warn("Cannot rollback transaction due to: " + e, e);
181                 }
182                 finally {
183                     BDbHelper.popTransaction();
184                 }
185             }
186             else {
187                 BDbHelper.popTransaction();
188             }
189         }
190     }
191 
192 
193     public void start() throws JMSException {
194         if (environment == null) {
195             directory.mkdirs();
196 
197             log.info("Creating Berkeley DB based message store in directory: " + directory.getAbsolutePath());
198 
199             try {
200                 environment = BDbHelper.createEnvironment(directory);
201             }
202             catch (DatabaseException e) {
203                 throw JMSExceptionHelper.newJMSException("Failed to open Berkeley DB persistent store at directory: "
204                         + directory + ". Reason: " + e, e);
205             }
206         }
207     }
208 
209     public synchronized void stop() throws JMSException {
210         if (environment != null) {
211             try {
212                 environment.close();
213             }
214             catch (DatabaseException e) {
215                 throw JMSExceptionHelper.newJMSException("Failed to close environment. Reason: " + e, e);
216             }
217             finally {
218                 environment = null;
219             }
220         }
221     }
222 
223     // Properties
224     //-------------------------------------------------------------------------
225     public File getDirectory() {
226         return directory;
227     }
228 
229     public void setDirectory(File directory) {
230         this.directory = directory;
231     }
232 
233     public WireFormat getWireFormat() {
234         return wireFormat;
235     }
236 
237     public void setWireFormat(WireFormat wireFormat) {
238         this.wireFormat = wireFormat;
239     }
240 
241     public TransactionConfig getTransactionConfig() {
242         return transactionConfig;
243     }
244 
245     public void setTransactionConfig(TransactionConfig transactionConfig) {
246         this.transactionConfig = transactionConfig;
247     }
248 
249     public Environment getEnvironment() {
250         return environment;
251     }
252 
253     public void setEnvironment(Environment environment) {
254         this.environment = environment;
255     }
256 
257     public DatabaseConfig getConfig() {
258         return config;
259     }
260 
261     public void setConfig(DatabaseConfig config) {
262         this.config = config;
263     }
264 
265     // Implementation methods
266     //-------------------------------------------------------------------------
267     protected Database createDatabase(String name) throws DatabaseException {
268         //System.out.println("#####   Opening database: " + name);
269 
270         if (log.isTraceEnabled()) {
271             log.trace("Opening database: " + name);
272         }
273         return environment.openDatabase(null, name, config);
274     }
275 
276     protected SecondaryDatabase createSecondaryDatabase(String name, Database database, SecondaryConfig secondaryConfig) throws DatabaseException {
277         //System.out.println("#####   Opening secondary database: " + name);
278 
279         if (log.isTraceEnabled()) {
280             log.trace("Opening secondary database: " + name);
281         }
282         return environment.openSecondaryDatabase(null, name, database, secondaryConfig);
283     }
284 
285     public static JMSException closeDatabase(Database db, JMSException firstException) {
286         if (db != null) {
287 
288             if (log.isTraceEnabled()) {
289                 try {
290                     log.trace("Closing database: " + db.getDatabaseName());
291                 }
292                 catch (DatabaseException e) {
293                     log.trace("Closing database: " + db + " but could not get the name: " + e);
294                 }
295             }
296             try {
297                 //System.out.println("#####  Closing database: " + db.getDatabaseName() + " " + db);
298                 db.close();
299             }
300             catch (DatabaseException e) {
301                 if (firstException == null) {
302                     firstException = JMSExceptionHelper.newJMSException("Failed to close database. Reason: " + e, e);
303                 }
304             }
305         }
306         return firstException;
307     }
308 
309     protected SecondaryConfig createSecondaryConfig(SecondaryKeyCreator keyGenerator) {
310         SecondaryConfig answer = new SecondaryConfig();
311         answer.setKeyCreator(keyGenerator);
312         answer.setAllowCreate(true);
313         answer.setAllowPopulate(true);
314         answer.setTransactional(true);
315         return answer;
316     }
317 }