1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.store.jdbc;
19
20 import java.io.IOException;
21 import java.sql.Connection;
22 import java.sql.SQLException;
23
24 import javax.jms.JMSException;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.codehaus.activemq.message.ActiveMQMessage;
29 import org.codehaus.activemq.message.MessageAck;
30 import org.codehaus.activemq.message.WireFormat;
31 import org.codehaus.activemq.service.MessageIdentity;
32 import org.codehaus.activemq.service.QueueMessageContainer;
33 import org.codehaus.activemq.store.MessageStore;
34 import org.codehaus.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler;
35 import org.codehaus.activemq.util.JMSExceptionHelper;
36
37 /***
38 * @version $Revision: 1.2 $
39 */
40 public class JDBCMessageStore implements MessageStore {
41 private static final Log log = LogFactory.getLog(JDBCMessageStore.class);
42
43 protected final WireFormat wireFormat;
44 protected final String destinationName;
45 protected final SequenceGenerator sequenceGenerator;
46 protected final JDBCAdapter adapter;
47
48 public JDBCMessageStore(JDBCAdapter adapter, WireFormat wireFormat, String destinationName) {
49 this.adapter = adapter;
50 this.sequenceGenerator = adapter.getSequenceGenerator();
51 this.wireFormat = wireFormat;
52 this.destinationName = destinationName;
53 }
54
55 public MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
56
57
58 String messageID = message.getJMSMessageID();
59 byte data[];
60 try {
61 data = wireFormat.toBytes(message);
62 } catch (IOException e) {
63 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
64 }
65
66 long seq=sequenceGenerator.getNextSequenceId();
67
68
69 try {
70 Connection c = TransactionContext.getConnection();
71 adapter.doAddMessage(c, seq, messageID, destinationName, data);
72 } catch (SQLException e) {
73 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
74 }
75
76 MessageIdentity answer = message.getJMSMessageIdentity();
77 answer.setSequenceNumber(new Long(seq));
78 return answer;
79 }
80
81
82 public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
83
84 long id = ((Long)identity.getSequenceNumber()).longValue();
85
86
87 try {
88 Connection c = TransactionContext.getConnection();
89 byte data[] = adapter.doGetMessage(c, id);
90 return (ActiveMQMessage)wireFormat.fromBytes(data);
91 } catch (IOException e) {
92 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
93 } catch (SQLException e) {
94 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
95 }
96 }
97
98 public void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException {
99 long seq = ((Long) identity.getSequenceNumber()).longValue();
100
101
102 try {
103 Connection c = TransactionContext.getConnection();
104 adapter.doRemoveMessage(c, seq);
105 } catch (SQLException e) {
106 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
107 }
108 }
109
110
111 public void recover(final QueueMessageContainer container) throws JMSException {
112
113
114 try {
115 Connection c = TransactionContext.getConnection();
116 adapter.doRecover(c, destinationName, new MessageListResultHandler() {
117 public void onMessage(long seq, String messageID) throws JMSException {
118 container.recoverMessageToBeDelivered(new MessageIdentity(messageID, new Long(seq)));
119 }
120 });
121
122 } catch (SQLException e) {
123 throw JMSExceptionHelper.newJMSException("Failed to recover container. Reason: " + e, e);
124 }
125 }
126
127 public void start() throws JMSException {
128 }
129
130 public void stop() throws JMSException {
131 }
132 }