1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.service.impl;
19
20 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
21 import org.codehaus.activemq.broker.BrokerClient;
22 import org.codehaus.activemq.filter.AndFilter;
23 import org.codehaus.activemq.filter.Filter;
24 import org.codehaus.activemq.filter.FilterFactory;
25 import org.codehaus.activemq.filter.FilterFactoryImpl;
26 import org.codehaus.activemq.filter.NoLocalFilter;
27 import org.codehaus.activemq.message.ActiveMQDestination;
28 import org.codehaus.activemq.message.ActiveMQMessage;
29 import org.codehaus.activemq.message.ConsumerInfo;
30 import org.codehaus.activemq.message.MessageAck;
31 import org.codehaus.activemq.service.Dispatcher;
32 import org.codehaus.activemq.service.MessageContainer;
33 import org.codehaus.activemq.service.Subscription;
34 import org.codehaus.activemq.service.SubscriptionContainer;
35 import org.codehaus.activemq.service.TopicMessageContainer;
36 import org.codehaus.activemq.store.PersistenceAdapter;
37
38 import javax.jms.DeliveryMode;
39 import javax.jms.IllegalStateException;
40 import javax.jms.JMSException;
41 import java.util.Iterator;
42 import java.util.Map;
43
44 /***
45 * A default Broker used for Topic messages for durable consumers
46 *
47 * @version $Revision: 1.15 $
48 */
49 public class DurableTopicMessageContainerManager extends MessageContainerManagerSupport {
50 private PersistenceAdapter persistenceAdapter;
51 protected SubscriptionContainer subscriptionContainer;
52 protected FilterFactory filterFactory;
53 protected Map activeSubscriptions = new ConcurrentHashMap();
54 private boolean loadedMessageContainers;
55
56 public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter) {
57 this(persistenceAdapter, new DurableTopicSubscriptionContainerImpl(), new FilterFactoryImpl(), new DispatcherImpl());
58 }
59
60 public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
61 super(dispatcher);
62 this.persistenceAdapter = persistenceAdapter;
63 this.subscriptionContainer = subscriptionContainer;
64 this.filterFactory = filterFactory;
65 }
66
67 /***
68 * @param client
69 * @param info
70 * @throws javax.jms.JMSException
71 */
72 public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
73 if (info.isDurableTopic()) {
74 doAddMessageConsumer(client, info);
75 }
76 }
77
78 /***
79 * @param client
80 * @param info
81 * @throws javax.jms.JMSException
82 */
83 public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
84 subscriptionContainer.removeSubscription(info.getConsumerId());
85 Subscription sub = (Subscription) activeSubscriptions.remove(info.getConsumerId());
86 if (sub != null) {
87 sub.setActive(false);
88 dispatcher.removeActiveSubscription(client, sub);
89 }
90 }
91
92 /***
93 * Delete a durable subscriber
94 *
95 * @param clientId
96 * @param subscriberName
97 * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active
98 */
99 public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
100 boolean subscriptionFound = false;
101 for (Iterator i = subscriptionContainer.subscriptionIterator(); i.hasNext();) {
102 Subscription sub = (Subscription) i.next();
103 if (sub.getClientId().equals(clientId) && sub.getSubscriberName().equals(subscriberName)) {
104
105 if (sub.isActive()) {
106 throw new JMSException("The Consummer " + subscriberName + " is still active");
107 }
108 else {
109 subscriptionContainer.removeSubscription(sub.getConsumerId());
110 sub.clear();
111 subscriptionFound = true;
112 }
113 }
114 }
115 if (!subscriptionFound) {
116 throw new IllegalStateException("The Consumer " + subscriberName + " does not exist for client: " + clientId);
117 }
118 }
119
120 /***
121 * @param client
122 * @param message
123 * @throws javax.jms.JMSException
124 */
125 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
126 ActiveMQDestination dest = (ActiveMQDestination) message.getJMSDestination();
127 if (dest != null && dest.isTopic() && message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT) {
128 MessageContainer container = getContainer(message.getJMSDestination().toString());
129
130
131 container.addMessage(message);
132 for (Iterator i = subscriptionContainer.subscriptionIterator(); i.hasNext();) {
133 Subscription sub = (Subscription) i.next();
134 if (sub.isTarget(message)) {
135 sub.addMessage(container, message);
136 }
137 }
138 }
139 }
140
141 /***
142 * Acknowledge a message as being read and consumed byh the Consumer
143 *
144 * @param client
145 * @param ack
146 * @throws javax.jms.JMSException
147 */
148 public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
149 Subscription sub = (Subscription) activeSubscriptions.get(ack.getConsumerId());
150 if (sub != null) {
151 sub.messageConsumed(ack);
152 }
153 }
154
155 public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack) throws JMSException {
156 Subscription sub = (Subscription) activeSubscriptions.get(ack.getConsumerId());
157 if (sub != null) {
158 sub.onAcknowledgeTransactedMessageBeforeCommit(ack);
159 }
160 }
161
162 public void redeliverMessage(BrokerClient client, MessageAck ack) throws JMSException {
163 Subscription sub = (Subscription) activeSubscriptions.get(ack.getConsumerId());
164 if (sub != null) {
165
166 for (Iterator iter = messageContainers.values().iterator(); iter.hasNext();) {
167 MessageContainer container = (MessageContainer) iter.next();
168 if (container.containsMessage(ack.getMessageIdentity())) {
169 sub.redeliverMessage(container, ack);
170
171
172 break;
173 }
174 }
175 }
176 }
177
178
179 /***
180 * poll or messages
181 *
182 * @throws javax.jms.JMSException
183 */
184 public void poll() throws JMSException {
185
186 }
187
188 public void commitTransaction(BrokerClient client, String transactionId) {
189 }
190
191 public void rollbackTransaction(BrokerClient client, String transactionId) {
192 }
193
194
195 public MessageContainer getContainer(String destinationName) throws JMSException {
196 TopicMessageContainer container = (TopicMessageContainer) messageContainers.get(destinationName);
197 if (container == null) {
198 container = persistenceAdapter.createTopicMessageContainer(destinationName);
199 container.start();
200 messageContainers.put(destinationName, container);
201 }
202 return container;
203 }
204
205
206
207
208
209 protected void doAddMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
210 boolean shouldRecover = false;
211 if (info.getConsumerName() != null && info.getClientId() != null) {
212 subscriptionContainer.checkForDuplicateDurableSubscription(client, info);
213 }
214 Subscription subscription = subscriptionContainer.getSubscription(info.getConsumerId());
215 if (subscription != null && subscription.isDurableTopic()) {
216
217 if (!subscription.getDestination().equals(subscription.getDestination()) || !subscription.getSelector().equals(info.getSelector())) {
218 subscriptionContainer.removeSubscription(info.getConsumerId());
219 subscription.clear();
220 subscription = subscriptionContainer.makeSubscription(dispatcher, info, createFilter(info));
221 }
222 }
223 else {
224 subscription = subscriptionContainer.makeSubscription(dispatcher, info, createFilter(info));
225 shouldRecover = true;
226 }
227 subscription.setActiveConsumer(info);
228 activeSubscriptions.put(info.getConsumerId(), subscription);
229
230 dispatcher.addActiveSubscription(client, subscription);
231
232 if (shouldRecover) {
233 recoverSubscriptions(subscription);
234 }
235
236
237
238
239
240
241 subscription.setActive(true);
242
243 }
244
245 /***
246 * This method is called when a new durable subscription is started and
247 * so we need to go through each matching message container
248 * and dispatch any matching messages that may be outstanding
249 *
250 * @param subscription
251 */
252 protected void recoverSubscriptions(Subscription subscription) throws JMSException {
253
254 if (subscription.isWildcard()) {
255 synchronized (this) {
256 if (!loadedMessageContainers) {
257 loadAllMessageContainers();
258 loadedMessageContainers = true;
259 }
260 }
261 }
262 else {
263
264 getContainer(subscription.getDestination().getPhysicalName());
265 }
266 for (Iterator iter = messageContainers.values().iterator(); iter.hasNext();) {
267 TopicMessageContainer container = (TopicMessageContainer) iter.next();
268 container.recoverSubscription(subscription);
269 }
270 }
271
272
273 /***
274 * Called when recovering a wildcard subscription
275 * where we need to load all the durable message containers
276 * (for which we have any outstanding messages to deliver) into RAM
277 */
278 protected void loadAllMessageContainers() {
279 /*** TODO */
280 }
281
282 /***
283 * Create filter for a Consumer
284 *
285 * @param info
286 * @return the Fitler
287 * @throws javax.jms.JMSException
288 */
289 protected Filter createFilter(ConsumerInfo info) throws JMSException {
290 Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
291 if (info.isNoLocal()) {
292 filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
293 }
294 return filter;
295 }
296
297
298 }