1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 * Copyright 2004 Protique Ltd
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 **/
19 package org.codehaus.activemq.store.jdbc;
20
21 import java.sql.Connection;
22 import java.sql.SQLException;
23
24 import javax.jms.JMSException;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.codehaus.activemq.message.ActiveMQMessage;
29 import org.codehaus.activemq.message.ConsumerInfo;
30 import org.codehaus.activemq.message.MessageAck;
31 import org.codehaus.activemq.message.WireFormat;
32 import org.codehaus.activemq.service.MessageContainer;
33 import org.codehaus.activemq.service.MessageIdentity;
34 import org.codehaus.activemq.service.SubscriberEntry;
35 import org.codehaus.activemq.service.Subscription;
36 import org.codehaus.activemq.store.TopicMessageStore;
37 import org.codehaus.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler;
38 import org.codehaus.activemq.util.JMSExceptionHelper;
39
40 /***
41 * @version $Revision: 1.2 $
42 */
43 public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
44
45 private static final Log log = LogFactory.getLog(JDBCTopicMessageStore.class);
46 private MessageContainer container;
47
48 public JDBCTopicMessageStore(JDBCAdapter adapter, WireFormat wireFormat, String destinationName) {
49 super(adapter, wireFormat, destinationName);
50 }
51
52 public void setLastAcknowledgedMessageIdentity(Subscription subscription, MessageIdentity messageIdentity) throws JMSException {
53 long seq = ((Long)messageIdentity.getSequenceNumber()).longValue();
54
55 try {
56 Connection c = TransactionContext.getConnection();
57 adapter.doSetLastAck(c, subscription.getPersistentKey(), destinationName, seq);
58 } catch (SQLException e) {
59 throw JMSExceptionHelper.newJMSException("Failed to store ack for: " + subscription + " on message "+messageIdentity+" in container: " + e, e);
60 }
61 }
62
63 /***
64 * @see org.codehaus.activemq.store.TopicMessageStore#getLastestMessageIdentity()
65 */
66 public MessageIdentity getLastestMessageIdentity() throws JMSException {
67 return new MessageIdentity(null, new Long(sequenceGenerator.getLastSequenceId()));
68 }
69
70 /***
71 * @see org.codehaus.activemq.store.TopicMessageStore#recoverSubscription(org.codehaus.activemq.service.Subscription, org.codehaus.activemq.service.MessageIdentity)
72 */
73 public void recoverSubscription(final Subscription subscription, MessageIdentity lastDispatchedMessage) throws JMSException {
74
75 try {
76 Connection c = TransactionContext.getConnection();
77 adapter.doRecoverSubscription(c, subscription.getPersistentKey(), destinationName, new MessageListResultHandler(){
78 public void onMessage(long seq, String messageID) throws JMSException {
79 MessageIdentity messageIdentity = new MessageIdentity(messageID, new Long(seq));
80 ActiveMQMessage message = getMessage(messageIdentity);
81 subscription.addMessage(container, message);
82 }
83 });
84 } catch (SQLException e) {
85 throw JMSExceptionHelper.newJMSException("Failed to recover subscription: " + subscription + ". Reason: " + e, e);
86 }
87 }
88
89 /***
90 * @see org.codehaus.activemq.store.TopicMessageStore#setSubscriberEntry(org.codehaus.activemq.message.ConsumerInfo, org.codehaus.activemq.service.SubscriberEntry)
91 */
92 public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
93 String key = info.getConsumerKey();
94 try {
95 Connection c = TransactionContext.getConnection();
96 adapter.doSetSubscriberEntry(c, destinationName, key, subscriberEntry);
97 } catch (SQLException e) {
98 throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
99 }
100 }
101
102 /***
103 * @see org.codehaus.activemq.store.TopicMessageStore#getSubscriberEntry(org.codehaus.activemq.message.ConsumerInfo)
104 */
105 public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
106 String key = info.getConsumerKey();
107 try {
108 Connection c = TransactionContext.getConnection();
109 return adapter.doGetSubscriberEntry(c, destinationName, key);
110 } catch (SQLException e) {
111 throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
112 }
113 }
114
115
116 /***
117 * @see org.codehaus.activemq.store.TopicMessageStore#setMessageContainer(org.codehaus.activemq.service.MessageContainer)
118 */
119 public void setMessageContainer(MessageContainer container) {
120 this.container=container;
121 }
122 public void incrementMessageCount(MessageIdentity messageId) throws JMSException {
123 }
124 public void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
125 }
126 }