1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.store.jdbm;
19
20 import jdbm.RecordManager;
21 import jdbm.RecordManagerFactory;
22 import jdbm.btree.BTree;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.codehaus.activemq.AlreadyClosedException;
26 import org.codehaus.activemq.service.impl.PersistenceAdapterSupport;
27 import org.codehaus.activemq.store.MessageStore;
28 import org.codehaus.activemq.store.PreparedTransactionStore;
29 import org.codehaus.activemq.store.TopicMessageStore;
30 import org.codehaus.activemq.util.DefaultComparator;
31 import org.codehaus.activemq.util.JMSExceptionHelper;
32
33 import javax.jms.JMSException;
34 import java.io.File;
35 import java.io.IOException;
36 import java.util.Comparator;
37 import java.util.Properties;
38
39 /***
40 * A {@link org.codehaus.activemq.store.PersistenceAdapter} implementation for
41 * <a href="http://jdbm.sf.net/">JDBM</a>
42 *
43 * @version $Revision: 1.5 $
44 */
45 public class JdbmPersistenceAdapter extends PersistenceAdapterSupport {
46
47 private static final Log log = LogFactory.getLog(JdbmPersistenceAdapter.class);
48
49 private RecordManager manager;
50 private File directory = new File("ActiveMQ");
51 private Properties properties;
52
53 /***
54 * Factory method to create an instance using the defaults
55 *
56 * @param directory the directory in which to store the persistent files
57 * @return
58 * @throws JMSException
59 */
60 public static JdbmPersistenceAdapter newInstance(File directory) throws JMSException {
61 return new JdbmPersistenceAdapter(directory);
62 }
63
64
65 public JdbmPersistenceAdapter() {
66 }
67
68 public JdbmPersistenceAdapter(File directory) {
69 this.directory = directory;
70 }
71
72 public JdbmPersistenceAdapter(RecordManager manager) {
73 this.manager = manager;
74 }
75
76 public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
77 try {
78 BTree messageDb = createDatabase("Queue_" + destinationName);
79 BTree sequenceDb = createDatabase("Sequence_Queue_" + destinationName);
80 JdbmMessageStore messageStore = new JdbmMessageStore(messageDb, sequenceDb);
81 return messageStore;
82 }
83 catch (IOException e) {
84 throw JMSExceptionHelper.newJMSException("Failed to create a QueueMessageContainer for destination: " + destinationName + ". Reason: " + e, e);
85 }
86 }
87
88 public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
89 try {
90 BTree messageDb = createDatabase("Topic_" + destinationName);
91 BTree sequenceDb = createDatabase("Sequence_Topic_" + destinationName);
92 BTree consumerAckDb = createDatabase("Consumer_Acks_Topic_" + destinationName);
93 BTree subscriberDb = createDatabase("Subscriber_" + destinationName);
94 BTree messageCountDb = createDatabase("MessageCount_Topic_" + destinationName);
95 JdbmTopicMessageStore messageStore = new JdbmTopicMessageStore(messageDb, sequenceDb, consumerAckDb, subscriberDb, messageCountDb);
96 return messageStore;
97 }
98 catch (IOException e) {
99 throw JMSExceptionHelper.newJMSException("Failed to create a TopicMessageContainer for destination: " + destinationName + ". Reason: " + e, e);
100 }
101 }
102
103 public PreparedTransactionStore createPreparedTransactionStore() throws JMSException {
104 try {
105 return new JdbmPreparedTransactionStore(createDatabase("XaPrepareTxnDb"));
106 }
107 catch (IOException e) {
108 throw JMSExceptionHelper.newJMSException("Could not create XA Prepare Transaction Database. Reason: " + e, e);
109 }
110 }
111
112 public void beginTransaction() {
113 }
114
115 public void commitTransaction() throws JMSException {
116 try {
117 manager.commit();
118 }
119 catch (IOException e) {
120 throw JMSExceptionHelper.newJMSException("Could not commit transaction. Reason: " + e, e);
121 }
122 }
123
124 public void rollbackTransaction() {
125 try {
126 manager.rollback();
127 }
128 catch (IOException e) {
129 log.error("Could not rollback transaction. Reason: " + e, e);
130 }
131 }
132
133 public void start() throws JMSException {
134 if (manager == null) {
135 directory.mkdirs();
136
137 log.info("Creating JDBM based message store in directory: " + directory.getAbsolutePath());
138
139 try {
140 String name = directory.getAbsolutePath() + "/Store";
141 if (properties != null) {
142 manager = RecordManagerFactory.createRecordManager(name, properties);
143 }
144 else {
145 manager = RecordManagerFactory.createRecordManager(name);
146 }
147 }
148 catch (IOException e) {
149 throw JMSExceptionHelper.newJMSException("Failed to create JDBM persistent store at directory: "
150 + directory + ". Reason: " + e, e);
151 }
152 }
153 }
154
155 public synchronized void stop() throws JMSException {
156 if (manager != null) {
157 try {
158 manager.close();
159 }
160 catch (IOException e) {
161 throw JMSExceptionHelper.newJMSException("Failed to close PersistenceAdapter. Reason: " + e, e);
162 }
163 finally {
164 manager = null;
165 }
166 }
167 }
168
169
170
171 public RecordManager getManager() {
172 return manager;
173 }
174
175 public void setManager(RecordManager manager) {
176 this.manager = manager;
177 }
178
179 public File getDirectory() {
180 return directory;
181 }
182
183 public void setDirectory(File directory) {
184 this.directory = directory;
185 }
186
187
188
189 public synchronized BTree createDatabase(String name) throws IOException, AlreadyClosedException {
190 if (manager == null) {
191 throw new AlreadyClosedException("JDBM PersistenceAdapter");
192 }
193
194
195 long recid = manager.getNamedObject(name);
196 BTree tree = null;
197 if (recid != 0) {
198 tree = BTree.load(manager, recid);
199 }
200 else {
201 Comparator comparator = new DefaultComparator();
202
203 tree = BTree.createInstance(manager, comparator);
204 manager.setNamedObject(name, tree.getRecid());
205 }
206 return tree;
207 }
208
209 }