View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Hiram Chirino
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.store.jdbc;
19  
20  import java.io.IOException;
21  import java.sql.Connection;
22  import java.sql.SQLException;
23  
24  import javax.jms.JMSException;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.codehaus.activemq.message.ActiveMQMessage;
29  import org.codehaus.activemq.message.MessageAck;
30  import org.codehaus.activemq.message.WireFormat;
31  import org.codehaus.activemq.service.MessageIdentity;
32  import org.codehaus.activemq.service.QueueMessageContainer;
33  import org.codehaus.activemq.store.MessageStore;
34  import org.codehaus.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler;
35  import org.codehaus.activemq.util.JMSExceptionHelper;
36  
37  /***
38   * @version $Revision: 1.2 $
39   */
40  public class JDBCMessageStore implements MessageStore {
41      private static final Log log = LogFactory.getLog(JDBCMessageStore.class);
42      
43      protected final WireFormat wireFormat;
44      protected final String destinationName;
45      protected final SequenceGenerator sequenceGenerator;
46      protected final JDBCAdapter adapter;
47  
48      public JDBCMessageStore(JDBCAdapter adapter, WireFormat wireFormat, String destinationName) {
49          this.adapter = adapter;
50          this.sequenceGenerator = adapter.getSequenceGenerator();
51          this.wireFormat = wireFormat;
52          this.destinationName = destinationName;
53      }
54  
55      public MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
56          
57          // Serialize the Message..
58          String messageID = message.getJMSMessageID();
59          byte data[];
60          try {
61              data = wireFormat.toBytes(message);
62          } catch (IOException e) {
63              throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
64          }
65          
66          long seq=sequenceGenerator.getNextSequenceId();
67  
68          // Get a connection and insert the message into the DB.
69          try {
70              Connection c = TransactionContext.getConnection();            
71              adapter.doAddMessage(c, seq, messageID, destinationName, data);
72          } catch (SQLException e) {
73              throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
74          }
75  
76          MessageIdentity answer = message.getJMSMessageIdentity();
77          answer.setSequenceNumber(new Long(seq));
78          return answer;
79      }
80  
81  
82      public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
83  
84          long id = ((Long)identity.getSequenceNumber()).longValue();
85          
86          // Get a connection and pull the messate out of the DB
87          try {
88              Connection c = TransactionContext.getConnection();            
89              byte data[] = adapter.doGetMessage(c, id);
90              return (ActiveMQMessage)wireFormat.fromBytes(data);            
91          } catch (IOException e) {
92              throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
93          } catch (SQLException e) {
94              throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
95          }
96      }
97  
98      public void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException {
99          long seq = ((Long) identity.getSequenceNumber()).longValue();
100         
101         // Get a connection and remove the message from the DB
102         try {            
103             Connection c = TransactionContext.getConnection();            
104             adapter.doRemoveMessage(c, seq);
105         } catch (SQLException e) {
106             throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
107         }
108     }
109 
110 
111     public void recover(final QueueMessageContainer container) throws JMSException {
112         
113         // Get all the Message ids out of the database.
114         try {
115             Connection c = TransactionContext.getConnection();            
116             adapter.doRecover(c, destinationName, new MessageListResultHandler() {
117                 public void onMessage(long seq, String messageID) throws JMSException {
118                     container.recoverMessageToBeDelivered(new MessageIdentity(messageID, new Long(seq)));                
119                 }
120             });     
121             
122         } catch (SQLException e) {
123             throw JMSExceptionHelper.newJMSException("Failed to recover container. Reason: " + e, e);
124         } 
125     }
126 
127     public void start() throws JMSException {
128     }
129 
130     public void stop() throws JMSException {
131     }
132 }