001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.service.boundedvm; 020 import java.util.ArrayList; 021 import java.util.Iterator; 022 import java.util.List; 023 024 import javax.jms.JMSException; 025 026 import org.apache.commons.logging.Log; 027 import org.apache.commons.logging.LogFactory; 028 import org.activemq.broker.BrokerClient; 029 import org.activemq.filter.Filter; 030 import org.activemq.io.util.MemoryBoundedQueue; 031 import org.activemq.message.ActiveMQDestination; 032 import org.activemq.message.ActiveMQMessage; 033 import org.activemq.message.ConsumerInfo; 034 import org.activemq.message.MessageAck; 035 import org.activemq.service.MessageContainer; 036 import org.activemq.service.MessageContainerAdmin; 037 import org.activemq.service.MessageIdentity; 038 import org.activemq.service.Service; 039 040 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 041 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 042 043 /** 044 * A MessageContainer for transient topics One of these exists for every active Connection consuming transient Topic 045 * messages 046 * 047 * @version $Revision: 1.1.1.1 $ 048 */ 049 public class TransientTopicBoundedMessageContainer 050 implements 051 MessageContainer, 052 Service, 053 Runnable, 054 MessageContainerAdmin { 055 private SynchronizedBoolean started; 056 private TransientTopicBoundedMessageManager manager; 057 private BrokerClient client; 058 private MemoryBoundedQueue queue; 059 private Thread worker; 060 private CopyOnWriteArrayList subscriptions; 061 private Log log; 062 063 /** 064 * Construct this beast 065 * 066 * @param manager 067 * @param client 068 * @param queue 069 */ 070 public TransientTopicBoundedMessageContainer(TransientTopicBoundedMessageManager manager, BrokerClient client, 071 MemoryBoundedQueue queue) { 072 this.manager = manager; 073 this.client = client; 074 this.queue = queue; 075 this.started = new SynchronizedBoolean(false); 076 this.subscriptions = new CopyOnWriteArrayList(); 077 this.log = LogFactory.getLog("TransientTopicBoundedMessageContainer:- " + client); 078 } 079 080 /** 081 * @return true if this Container has no active subscriptions 082 */ 083 public boolean isInactive() { 084 return subscriptions.isEmpty(); 085 } 086 087 /** 088 * @return the BrokerClient this Container is dispatching to 089 */ 090 public BrokerClient getBrokerClient() { 091 return client; 092 } 093 094 /** 095 * Add a consumer to dispatch messages to 096 * 097 * @param filter 098 * @param info 099 */ 100 public TransientTopicSubscription addConsumer(Filter filter, ConsumerInfo info) { 101 TransientTopicSubscription ts = findMatch(info); 102 if (ts == null) { 103 ts = new TransientTopicSubscription(filter, info, client); 104 subscriptions.add(ts); 105 } 106 return ts; 107 } 108 109 /** 110 * Remove a consumer 111 * 112 * @param info 113 */ 114 public void removeConsumer(ConsumerInfo info) { 115 TransientTopicSubscription ts = findMatch(info); 116 if (ts != null) { 117 subscriptions.remove(ts); 118 } 119 } 120 121 /** 122 * start working 123 */ 124 public void start() { 125 if (started.commit(false, true)) { 126 if (manager.isDecoupledDispatch()) { 127 worker = new Thread(this, "TransientTopicDispatcher"); 128 worker.setPriority(Thread.NORM_PRIORITY + 2); 129 worker.start(); 130 } 131 } 132 } 133 134 /** 135 * See if this container should get this message and dispatch it 136 * 137 * @param sender the BrokerClient the message came from 138 * @param message 139 * @return true if it is a valid container 140 * @throws JMSException 141 */ 142 public boolean targetAndDispatch(BrokerClient sender, ActiveMQMessage message) throws JMSException { 143 boolean result = false; 144 if (!this.client.isClusteredConnection() || !sender.isClusteredConnection()) { 145 List tmpList = null; 146 for (Iterator i = subscriptions.iterator();i.hasNext();) { 147 TransientTopicSubscription ts = (TransientTopicSubscription) i.next(); 148 149 if (!(ts.client.getChannel() != null && ts.client.getChannel().isMulticast() && ts.client.equals(sender)) && ts.isTarget(message)) { 150 if (tmpList == null) { 151 tmpList = new ArrayList(); 152 } 153 tmpList.add(ts); 154 } 155 } 156 dispatchToQueue(message, tmpList); 157 result = tmpList != null; 158 } 159 return result; 160 } 161 162 /** 163 * stop working 164 */ 165 public void stop() { 166 started.set(false); 167 queue.clear(); 168 } 169 170 /** 171 * close down this container 172 */ 173 public void close() { 174 if (started.get()) { 175 stop(); 176 } 177 queue.close(); 178 } 179 180 181 /** 182 * do some dispatching 183 */ 184 public void run() { 185 int count = 0; 186 ActiveMQMessage message = null; 187 while (started.get()) { 188 try { 189 message = (ActiveMQMessage) queue.dequeue(2000); 190 if (message != null) { 191 if (!message.isExpired()) { 192 client.dispatch(message); 193 if (++count == 250) { 194 count = 0; 195 Thread.yield(); 196 } 197 }else { 198 if (log.isDebugEnabled()){ 199 log.debug("Message: " + message + " has expired"); 200 } 201 } 202 } 203 } 204 catch (Exception e) { 205 stop(); 206 log.warn("stop dispatching", e); 207 } 208 } 209 } 210 211 private void dispatchToQueue(ActiveMQMessage message, List list) throws JMSException { 212 if (list != null && !list.isEmpty()) { 213 int[] ids = new int[list.size()]; 214 for (int i = 0;i < list.size();i++) { 215 TransientTopicSubscription ts = (TransientTopicSubscription) list.get(i); 216 ids[i] = ts.getConsumerInfo().getConsumerNo(); 217 } 218 message = message.shallowCopy(); 219 message.setConsumerNos(ids); 220 if (manager.isDecoupledDispatch()) { 221 queue.enqueue(message); 222 } 223 else { 224 client.dispatch(message); 225 } 226 } 227 } 228 229 private TransientTopicSubscription findMatch(ConsumerInfo info) { 230 TransientTopicSubscription result = null; 231 for (Iterator i = subscriptions.iterator();i.hasNext();) { 232 TransientTopicSubscription ts = (TransientTopicSubscription) i.next(); 233 if (ts.getConsumerInfo().equals(info)) { 234 result = ts; 235 break; 236 } 237 } 238 return result; 239 } 240 241 /** 242 * @param destination 243 * @return true if a 244 */ 245 public boolean hasConsumerFor(ActiveMQDestination destination) { 246 for (Iterator i = subscriptions.iterator();i.hasNext();) { 247 TransientTopicSubscription ts = (TransientTopicSubscription) i.next(); 248 ConsumerInfo info = ts.getConsumerInfo(); 249 if (info.getDestination().matches(destination)) { 250 return true; 251 } 252 } 253 return false; 254 } 255 256 /** 257 * @return the destination name 258 */ 259 public String getDestinationName() { 260 return ""; 261 } 262 263 /** 264 * @param msg 265 * @return @throws JMSException 266 */ 267 public void addMessage(ActiveMQMessage msg) throws JMSException { 268 } 269 270 /** 271 * @param messageIdentity 272 * @param ack 273 * @throws JMSException 274 */ 275 public void delete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException { 276 } 277 278 /** 279 * @param messageIdentity 280 * @return @throws JMSException 281 */ 282 public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException { 283 return null; 284 } 285 286 /** 287 * @param messageIdentity 288 * @throws JMSException 289 */ 290 public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException { 291 } 292 293 /** 294 * @param messageIdentity 295 * @param ack 296 * @throws JMSException 297 */ 298 public void unregisterMessageInterest(MessageIdentity messageIdentity) throws JMSException { 299 } 300 301 /** 302 * @param messageIdentity 303 * @return @throws JMSException 304 */ 305 public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException { 306 return false; 307 } 308 309 /** 310 * @see org.activemq.service.MessageContainer#getMessageContainerAdmin() 311 */ 312 public MessageContainerAdmin getMessageContainerAdmin() { 313 return this; 314 } 315 316 /** 317 * @see org.activemq.service.MessageContainerAdmin#empty() 318 */ 319 public void empty() throws JMSException { 320 // TODO implement me 321 } 322 323 /** 324 * @see org.activemq.service.MessageContainer#isDeadLetterQueue() 325 */ 326 public boolean isDeadLetterQueue() { 327 return false; 328 } 329 }