1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.store.bdb;
19
20 import com.sleepycat.je.Database;
21 import com.sleepycat.je.DatabaseEntry;
22 import com.sleepycat.je.DatabaseException;
23 import com.sleepycat.je.LockMode;
24 import com.sleepycat.je.OperationStatus;
25 import com.sleepycat.je.SecondaryConfig;
26 import com.sleepycat.je.SecondaryCursor;
27 import com.sleepycat.je.SecondaryDatabase;
28 import com.sleepycat.je.Transaction;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.codehaus.activemq.message.ActiveMQMessage;
32 import org.codehaus.activemq.message.ConsumerInfo;
33 import org.codehaus.activemq.message.MessageAck;
34 import org.codehaus.activemq.message.WireFormat;
35 import org.codehaus.activemq.service.MessageIdentity;
36 import org.codehaus.activemq.service.SubscriberEntry;
37 import org.codehaus.activemq.service.Subscription;
38 import org.codehaus.activemq.store.TopicMessageStore;
39 import org.codehaus.activemq.util.JMSExceptionHelper;
40
41 import javax.jms.JMSException;
42 import java.io.IOException;
43
44 /***
45 * @version $Revision: 1.1 $
46 */
47 public class BDbTopicMessageStore extends BDbMessageStore implements TopicMessageStore {
48 private static final Log log = LogFactory.getLog(BDbTopicMessageStore.class);
49
50 private Database subscriptionDatabase;
51
52 public BDbTopicMessageStore(Database database, SecondaryDatabase secondaryDatabase, SecondaryConfig secondaryConfig, SequenceNumberCreator sequenceNumberCreator, WireFormat wireFormat, Database subscriptionDatabase) {
53 super(database, secondaryDatabase, secondaryConfig, sequenceNumberCreator, wireFormat);
54 this.subscriptionDatabase = subscriptionDatabase;
55 }
56
57 public void incrementMessageCount(MessageIdentity messageId) {
58 /*** TODO */
59 }
60
61 public void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity, MessageAck ack) {
62 /*** TODO */
63 }
64
65 public void setLastAcknowledgedMessageIdentity(Subscription subscription, MessageIdentity messageIdentity) throws JMSException {
66 checkClosed();
67 try {
68 doSetLastAcknowledgedMessageIdentity(subscription, messageIdentity);
69 }
70 catch (DatabaseException e) {
71 throw JMSExceptionHelper.newJMSException("Failed to update last acknowledge messageID for : "
72 + messageIdentity + ". Reason: " + e, e);
73 }
74 }
75
76 public void recoverSubscription(Subscription subscription, MessageIdentity lastDispatchedMessage) throws JMSException {
77 checkClosed();
78 SecondaryCursor cursor = null;
79 try {
80 DatabaseEntry lastAckKey = getLastAcknowledgedMessageID(subscription, lastDispatchedMessage);
81 if (lastAckKey != null) {
82 cursor = getSecondaryDatabase().openSecondaryCursor(BDbHelper.getTransaction(), getCursorConfig());
83 DatabaseEntry valueEntry = new DatabaseEntry();
84 OperationStatus status = cursor.getSearchKey(lastAckKey, valueEntry, LockMode.DEFAULT);
85 if (status != OperationStatus.SUCCESS) {
86 log.error("Could not find the last acknowledged record for: " + subscription + ". Status: " + status);
87 }
88 else {
89 while (true) {
90
91 status = cursor.getNext(lastAckKey, valueEntry, LockMode.DEFAULT);
92 if (status != OperationStatus.SUCCESS) {
93 if (status != OperationStatus.NOTFOUND) {
94 log.warn("Strange result when iterating to end of collection: " + status);
95 }
96 break;
97 }
98
99 ActiveMQMessage message = extractMessage(valueEntry);
100 subscription.addMessage(getContainer(), message);
101 }
102 }
103 }
104 }
105 catch (DatabaseException e) {
106 throw JMSExceptionHelper.newJMSException("Unable to recover topic subscription for: "
107 + subscription + ". Reason: " + e, e);
108 }
109 catch (IOException e) {
110 throw JMSExceptionHelper.newJMSException("Unable to recover topic subscription for: "
111 + subscription + ". Reason: " + e, e);
112 }
113 finally {
114 if (cursor != null) {
115 try {
116 cursor.close();
117 }
118 catch (DatabaseException e) {
119 log.warn("Caught exception closing cursor: " + e, e);
120 }
121 }
122 }
123 }
124
125 public MessageIdentity getLastestMessageIdentity() throws JMSException {
126 checkClosed();
127 SecondaryCursor cursor = null;
128 try {
129 cursor = getSecondaryDatabase().openSecondaryCursor(BDbHelper.getTransaction(), getCursorConfig());
130 DatabaseEntry keyEntry = new DatabaseEntry();
131 DatabaseEntry valueEntry = new DatabaseEntry();
132 OperationStatus status = cursor.getLast(keyEntry, valueEntry, LockMode.DEFAULT);
133 if (status == OperationStatus.SUCCESS) {
134 if (log.isDebugEnabled()) {
135 log.debug("Loaded last sequence number of: " + BDbHelper.longFromBytes(keyEntry.getData()));
136 }
137 return new MessageIdentity(null, keyEntry);
138 }
139 else if (status != OperationStatus.NOTFOUND) {
140 log.error("Could not find the last sequence number. Status: " + status);
141 }
142 return null;
143 }
144 catch (DatabaseException e) {
145 throw JMSExceptionHelper.newJMSException("Unable to load the last sequence number. Reason: " + e, e);
146 }
147 finally {
148 if (cursor != null) {
149 try {
150 cursor.close();
151 }
152 catch (DatabaseException e) {
153 log.warn("Caught exception closing cursor: " + e, e);
154 }
155 }
156 }
157
158 }
159
160 public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
161 return null; /*** TODO */
162 }
163
164 public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
165 /*** TODO */
166 }
167
168 public synchronized void stop() throws JMSException {
169 JMSException firstException = BDbPersistenceAdapter.closeDatabase(subscriptionDatabase, null);
170 subscriptionDatabase = null;
171 super.stop();
172 if (firstException != null) {
173 throw JMSExceptionHelper.newJMSException("Unable to close the subscription database: " + firstException, firstException);
174 }
175 }
176
177
178
179
180 protected DatabaseEntry getLastAcknowledgedMessageID(Subscription subscription, MessageIdentity lastDispatchedMessage) throws DatabaseException {
181 DatabaseEntry key = createKey(subscription.getPersistentKey());
182 DatabaseEntry value = new DatabaseEntry();
183 OperationStatus status = subscriptionDatabase.get(null, key, value, null);
184 if (status == OperationStatus.SUCCESS) {
185 return value;
186 }
187 else if (status == OperationStatus.NOTFOUND) {
188
189 if (lastDispatchedMessage != null) {
190 return doSetLastAcknowledgedMessageIdentity(subscription, lastDispatchedMessage);
191 }
192 }
193 else {
194 log.warn("Unexpected status return from querying lastAcknowledgeSequenceNumber for: " + subscription + " status: " + status);
195 }
196 return null;
197 }
198
199 protected DatabaseEntry doSetLastAcknowledgedMessageIdentity(Subscription subscription, MessageIdentity messageIdentity) throws DatabaseException {
200 Transaction transaction = BDbHelper.getTransaction();
201 DatabaseEntry key = createKey(subscription.getPersistentKey());
202 DatabaseEntry value = getSequenceNumberKey(messageIdentity);
203 subscriptionDatabase.put(transaction, key, value);
204 return value;
205 }
206 }