1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.service.boundedvm;
20
21 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
22 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
23 import org.codehaus.activemq.broker.BrokerClient;
24 import org.codehaus.activemq.filter.AndFilter;
25 import org.codehaus.activemq.filter.Filter;
26 import org.codehaus.activemq.filter.FilterFactory;
27 import org.codehaus.activemq.filter.FilterFactoryImpl;
28 import org.codehaus.activemq.filter.NoLocalFilter;
29 import org.codehaus.activemq.message.ActiveMQMessage;
30 import org.codehaus.activemq.message.ConsumerInfo;
31 import org.codehaus.activemq.message.MessageAck;
32 import org.codehaus.activemq.message.util.MemoryBoundedQueue;
33 import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
34 import org.codehaus.activemq.service.MessageContainer;
35 import org.codehaus.activemq.service.MessageContainerManager;
36
37 import javax.jms.JMSException;
38 import java.util.Iterator;
39
40 /***
41 * A MessageContainerManager for transient topics
42 *
43 * @version $Revision: 1.6 $
44 */
45
46 /***
47 * A manager of MessageContainer instances
48 */
49 public class TransientTopicBoundedMessageManager implements MessageContainerManager {
50 private MemoryBoundedQueueManager queueManager;
51 private ConcurrentHashMap containers;
52 private FilterFactory filterFactory;
53 private SynchronizedBoolean started;
54
55 /***
56 * Constructor for TransientTopicBoundedMessageManager
57 *
58 * @param mgr
59 */
60 public TransientTopicBoundedMessageManager(MemoryBoundedQueueManager mgr) {
61 this.queueManager = mgr;
62 this.containers = new ConcurrentHashMap();
63 this.filterFactory = new FilterFactoryImpl();
64 this.started = new SynchronizedBoolean(false);
65 }
66
67 /***
68 * start the manager
69 *
70 * @throws JMSException
71 */
72 public void start() throws JMSException {
73 if (started.commit(false, true)) {
74 for (Iterator i = containers.values().iterator(); i.hasNext();) {
75 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
76 container.start();
77 }
78 }
79 }
80
81 /***
82 * stop the manager
83 *
84 * @throws JMSException
85 */
86 public void stop() throws JMSException {
87 if (started.commit(true, false)) {
88 for (Iterator i = containers.values().iterator(); i.hasNext();) {
89 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
90 container.stop();
91 }
92 }
93 }
94
95 /***
96 * Add a consumer if appropiate
97 *
98 * @param client
99 * @param info
100 * @throws JMSException
101 */
102 public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
103 if (info.getDestination().isTopic()) {
104 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) containers
105 .get(client);
106 if (container == null) {
107 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(client.toString());
108 container = new TransientTopicBoundedMessageContainer(client, queue);
109 containers.put(client, container);
110 if (started.get()) {
111 container.start();
112 }
113 }
114 container.addConsumer(createFilter(info), info);
115 }
116 }
117
118 /***
119 * @param client
120 * @param info
121 * @throws JMSException
122 */
123 public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
124 if (info.getDestination().isTopic()) {
125 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) containers
126 .get(client);
127 if (container != null) {
128 container.removeConsumer(info);
129 if (container.isInactive()) {
130 containers.remove(client);
131 container.close();
132 }
133 }
134 }
135 }
136
137 /***
138 * Delete a durable subscriber
139 *
140 * @param clientId
141 * @param subscriberName
142 * @throws JMSException if the subscriber doesn't exist or is still active
143 */
144 public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
145 }
146
147 /***
148 * @param client
149 * @param message
150 * @throws JMSException
151 */
152 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
153 if (message != null && message.getJMSActiveMQDestination().isTopic()) {
154 for (Iterator i = containers.values().iterator(); i.hasNext();) {
155 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
156 container.targetAndDispatch(message.shallowCopy());
157
158 }
159 }
160 }
161
162 public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
163 }
164
165 public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack) throws JMSException {
166 }
167
168 public void redeliverMessage(BrokerClient client, MessageAck ack) throws JMSException {
169 }
170
171 public void poll() throws JMSException {
172 }
173
174 /***
175 * A hook when the transaction is about to be commited; so apply all outstanding commands to the Journal if using a
176 * Journal (transaction log)
177 *
178 * @param client
179 * @param transactionId
180 * @throws JMSException
181 */
182 public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
183 }
184
185 /***
186 * A hook when the transaction is about to be rolled back; so discard all outstanding commands that are pending to
187 * be written to the Journal
188 *
189 * @param client
190 * @param transactionId
191 */
192 public void rollbackTransaction(BrokerClient client, String transactionId) {
193 }
194
195 public MessageContainer getContainer(String physicalName) {
196 return null;
197 }
198
199 /***
200 * Create filter for a Consumer
201 *
202 * @param info
203 * @return the Fitler
204 * @throws javax.jms.JMSException
205 */
206 protected Filter createFilter(ConsumerInfo info) throws JMSException {
207 Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
208 if (info.isNoLocal()) {
209 filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
210 }
211 return filter;
212 }
213 }