View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.store.howl;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.message.ActiveMQMessage;
23  import org.codehaus.activemq.message.MessageAck;
24  import org.codehaus.activemq.message.Packet;
25  import org.codehaus.activemq.message.WireFormat;
26  import org.codehaus.activemq.service.MessageIdentity;
27  import org.codehaus.activemq.service.QueueMessageContainer;
28  import org.codehaus.activemq.store.MessageStore;
29  import org.codehaus.activemq.util.Callback;
30  import org.codehaus.activemq.util.JMSExceptionHelper;
31  import org.codehaus.activemq.util.TransactionTemplate;
32  import org.objectweb.howl.log.LogConfigurationException;
33  import org.objectweb.howl.log.LogException;
34  import org.objectweb.howl.log.LogRecord;
35  import org.objectweb.howl.log.Logger;
36  import org.objectweb.howl.log.ReplayListener;
37  
38  import javax.jms.JMSException;
39  import java.io.IOException;
40  import java.util.LinkedHashMap;
41  import java.util.Map;
42  
43  /***
44   * An implementation of {@link MessageStore} designed for
45   * optimal use with <a href="http://howl.objectweb.org/">Howl</a>
46   * as the transaction log and then checkpointing asynchronously
47   * on a timeout with some other persistent storage.
48   *
49   * @version $Revision: 1.2 $
50   */
51  public class HowlMessageStore implements MessageStore {
52  
53      private static final int DEFAULT_RECORD_SIZE = 64 * 1024;
54      private static final Log log = LogFactory.getLog(HowlMessageStore.class);
55  
56      private HowlPersistenceAdapter longTermPersistence;
57      private MessageStore longTermStore;
58      private Logger transactionLog;
59      private WireFormat wireFormat;
60      private TransactionTemplate transactionTemplate;
61      private int maximumCacheSize = 100;
62      private Map map = new LinkedHashMap();
63      private boolean sync = true;
64      private long lastLogMark;
65      private Exception firstException;
66  
67      public HowlMessageStore(HowlPersistenceAdapter adapter, MessageStore checkpointStore, Logger transactionLog, WireFormat wireFormat) {
68          this.longTermPersistence = adapter;
69          this.longTermStore = checkpointStore;
70          this.transactionLog = transactionLog;
71          this.wireFormat = wireFormat;
72          this.transactionTemplate = new TransactionTemplate(adapter);
73      }
74  
75      /***
76       * This method is synchronized to ensure that only 1 thread can write to the log and cache
77       * and possibly checkpoint at once, to preserve order across the transaction log,
78       * cache and checkpointStore.
79       */
80      public synchronized MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
81          // write to howl
82          writePacket(message);
83  
84          // can we add it to the cache?
85          if (!addMessageToCache(message)) {
86              log.warn("Not enough RAM to store the active transaction log and so we're having to force" +
87                      "a checkpoint so that we can ensure that reads are efficient and do not have to " +
88                      "replay the transaction log");
89              checkpoint(message);
90  
91              // now lets add the current message to the checkpoint store
92              longTermStore.addMessage(message);
93          }
94          return message.getJMSMessageIdentity();
95      }
96  
97      /***
98       * Lets ensure that readers don't block writers so there only synchronization on
99       * the cache and checkpointStore.
100      */
101     public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
102         ActiveMQMessage answer = null;
103         synchronized (map) {
104             answer = (ActiveMQMessage) map.get(identity.getMessageID());
105         }
106         if (answer == null) {
107             answer = longTermStore.getMessage(identity);
108         }
109         return answer;
110     }
111 
112     /***
113      * Removes can be done in any order so we only synchronize on the cache and
114      * checkpointStore
115      */
116     public void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException {
117         // write to howl
118         writePacket(ack);
119 
120         synchronized (map) {
121             map.remove(identity.getMessageID());
122         }
123         longTermPersistence.onMessageRemove(this);
124     }
125 
126     /***
127      * Replays the checkpointStore first as those messages are the oldest ones,
128      * then messages are replayed from the transaction log and then
129      * the cache is updated.
130      *
131      * @param container
132      * @throws JMSException
133      */
134     public synchronized void recover(final QueueMessageContainer container) throws JMSException {
135         longTermStore.recover(container);
136 
137         // replay the transaction log, updating the cache and adding any messages to be dispatched
138         // to the container
139         firstException = null;
140         try {
141             transactionLog.replay(new ReplayListener() {
142                 LogRecord record = new LogRecord(DEFAULT_RECORD_SIZE);
143 
144                 public void onRecord(LogRecord logRecord) {
145                     readPacket(logRecord, container);
146                 }
147 
148                 public void onError(LogException e) {
149                     log.error("Error while recovering Howl transaction log: " + e, e);
150                 }
151 
152                 public LogRecord getLogRecord() {
153                     return record;
154                 }
155             });
156         }
157         catch (LogConfigurationException e) {
158             throw createRecoveryFailedException(e);
159         }
160         if (firstException != null) {
161             if (firstException instanceof JMSException) {
162                 throw (JMSException) firstException;
163             }
164             else {
165                 throw createRecoveryFailedException(firstException);
166             }
167         }
168     }
169 
170     public synchronized void start() throws JMSException {
171         longTermStore.start();
172     }
173 
174     public synchronized void stop() throws JMSException {
175         longTermStore.stop();
176     }
177 
178     /***
179      * Writes the current RAM cache to the long term, checkpoint store so that the
180      * transaction log can be truncated.
181      */
182     public synchronized void checkpoint() throws JMSException {
183         checkpoint(null);
184     }
185 
186 
187     // Properties
188     //-------------------------------------------------------------------------
189     public int getMaximumCacheSize() {
190         return maximumCacheSize;
191     }
192 
193     public void setMaximumCacheSize(int maximumCacheSize) {
194         this.maximumCacheSize = maximumCacheSize;
195     }
196 
197     // Implementation methods
198     //-------------------------------------------------------------------------
199 
200     /***
201      * Writes the current RAM image of the transaction log to stable, checkpoint store
202      *
203      * @param message is an optional message. This is null for timer based
204      *                checkpoints or is the message which cannot fit into the cache if cache-exhaustion
205      *                based checkpoints
206      * @throws JMSException
207      */
208     protected void checkpoint(final ActiveMQMessage message) throws JMSException {
209         // lets create a copy of the collection to avoid blocking readers
210         ActiveMQMessage[] temp = null;
211         synchronized (map) {
212             temp = new ActiveMQMessage[map.size()];
213             map.values().toArray(temp);
214 
215             // lets clear the map so that its next contents represent
216             // the stuff we need to checkpoint next time around
217             map.clear();
218         }
219 
220         final ActiveMQMessage[] data = temp;
221         transactionTemplate.run(new Callback() {
222             public void execute() throws Throwable {
223                 for (int i = 0, size = data.length; i < size; i++) {
224                     longTermStore.addMessage(data[i]);
225                 }
226                 if (message != null) {
227                     longTermStore.addMessage(message);
228                 }
229             }
230         });
231 
232         try {
233             transactionLog.mark(lastLogMark);
234         }
235         catch (Exception e) {
236             throw JMSExceptionHelper.newJMSException("Failed to checkpoint the Howl transaction log: " + e, e);
237         }
238     }
239 
240     /***
241      * Adds the given message to the cache if there is spare capacity
242      *
243      * @param message
244      * @return true if the message was added to the cache or false
245      */
246     protected boolean addMessageToCache(ActiveMQMessage message) {
247         synchronized (map) {
248             if (map.size() < maximumCacheSize && longTermPersistence.hasCacheCapacity(this)) {
249                 map.put(message.getJMSMessageID(), message);
250                 return true;
251             }
252         }
253         return false;
254     }
255 
256     protected void readPacket(LogRecord logRecord, QueueMessageContainer container) {
257         if (!logRecord.isCTRL() && !logRecord.isEOB() && logRecord.length > 0) {
258             try {
259                 // TODO for some wierd reason we get an unnecessary long which I'm guessing is the size
260                 Packet packet = wireFormat.fromBytes(logRecord.data, 2, logRecord.length - 2);
261                 if (packet instanceof ActiveMQMessage) {
262                     container.addMessage((ActiveMQMessage) packet);
263                 }
264                 else if (packet instanceof MessageAck) {
265                     MessageAck ack = (MessageAck) packet;
266                     container.delete(ack.getMessageIdentity(), ack);
267                 }
268                 else {
269                     log.error("Unknown type of packet in transaction log which will be discarded: " + packet);
270                 }
271             }
272             catch (Exception e) {
273                 if (firstException == null) {
274                     firstException = e;
275                 }
276             }
277         }
278     }
279 
280     /***
281      * Writes a message to the transaction log using the current sync mode
282      */
283     protected synchronized void writePacket(Packet packet) throws JMSException {
284         try {
285             byte[] data = wireFormat.toBytes(packet);
286             lastLogMark = transactionLog.put(data, sync);
287         }
288         catch (IOException e) {
289             throw createWriteException(packet, e);
290         }
291         catch (LogException e) {
292             throw createWriteException(packet, e);
293         }
294         catch (InterruptedException e) {
295             throw createWriteException(packet, e);
296         }
297     }
298 
299 
300     protected JMSException createRecoveryFailedException(Exception e) {
301         return JMSExceptionHelper.newJMSException("Failed to recover from Howl transaction log. Reason: " + e, e);
302     }
303 
304     protected JMSException createWriteException(Packet packet, Exception e) {
305         return JMSExceptionHelper.newJMSException("Failed to write to Howl transaction log for: " + packet + ". Reason: " + e, e);
306     }
307 }