1 /*** 2 * 3 * Copyright 2004 Protique Ltd 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 **/ 18 19 package org.codehaus.activemq.service.boundedvm; 20 import java.util.ArrayList; 21 import java.util.Iterator; 22 import java.util.List; 23 import javax.jms.JMSException; 24 import org.apache.commons.logging.Log; 25 import org.apache.commons.logging.LogFactory; 26 import org.codehaus.activemq.broker.BrokerClient; 27 import org.codehaus.activemq.filter.Filter; 28 import org.codehaus.activemq.message.ActiveMQMessage; 29 import org.codehaus.activemq.message.ConsumerInfo; 30 import org.codehaus.activemq.message.util.BoundedPacketQueue; 31 import org.codehaus.activemq.service.Service; 32 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 33 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 34 35 /*** 36 * A MessageContainer for transient topics One of these exists for every active Connection consuming transient Topic 37 * messages 38 * 39 * @version $Revision: 1.2 $ 40 */ 41 public class TransientTopicBoundedMessageContainer implements Service, Runnable { 42 private SynchronizedBoolean started; 43 private BrokerClient client; 44 private BoundedPacketQueue queue; 45 private Thread worker; 46 private CopyOnWriteArrayList subscriptions; 47 private Log log; 48 49 /*** 50 * Construct this beast 51 * 52 * @param client 53 * @param queue 54 */ 55 public TransientTopicBoundedMessageContainer(BrokerClient client, BoundedPacketQueue queue) { 56 this.client = client; 57 this.queue = queue; 58 this.started = new SynchronizedBoolean(false); 59 this.subscriptions = new CopyOnWriteArrayList(); 60 this.log = LogFactory.getLog("TransientTopicBoundedMessageContainer for client: " + client); 61 } 62 63 /*** 64 * @return true if this Container has no active subscriptions 65 */ 66 public boolean isInactive() { 67 return subscriptions.isEmpty(); 68 } 69 70 /*** 71 * Add a consumer to dispatch messages to 72 * 73 * @param filter 74 * @param info 75 */ 76 public void addConsumer(Filter filter, ConsumerInfo info) { 77 TransientTopicSubscription ts = findMatch(info); 78 if (ts == null) { 79 ts = new TransientTopicSubscription(filter, info); 80 subscriptions.add(ts); 81 } 82 } 83 84 /*** 85 * Remove a consumer 86 * 87 * @param info 88 */ 89 public void removeConsumer(ConsumerInfo info) { 90 TransientTopicSubscription ts = findMatch(info); 91 if (ts != null) { 92 subscriptions.remove(ts); 93 } 94 } 95 96 /*** 97 * start working 98 */ 99 public void start() { 100 if (started.commit(false, true)) { 101 worker = new Thread(this); 102 worker.setPriority(Thread.NORM_PRIORITY + 1); 103 worker.start(); 104 } 105 } 106 107 /*** 108 * See if this container should get this message and dispatch it 109 * 110 * @param message 111 * @return true if it is a valid container 112 * @throws JMSException 113 */ 114 public boolean targetAndDispatch(ActiveMQMessage message) throws JMSException { 115 boolean result = false; 116 List tmpList = null; 117 for (Iterator i = subscriptions.iterator();i.hasNext();) { 118 TransientTopicSubscription ts = (TransientTopicSubscription) i.next(); 119 if (ts.isTarget(message)) { 120 if (tmpList == null) { 121 tmpList = new ArrayList(); 122 } 123 tmpList.add(ts); 124 } 125 } 126 dispatchToQueue(message, tmpList); 127 return tmpList != null; 128 } 129 130 /*** 131 * stop working 132 */ 133 public void stop() { 134 started.set(false); 135 queue.clear(); 136 } 137 138 /*** 139 * close down this container 140 */ 141 public void close() { 142 if(started.get()) { 143 stop(); 144 } 145 queue.close(); 146 } 147 148 /*** 149 * do some dispatching 150 */ 151 public void run() { 152 int count = 0; 153 while (started.get()) { 154 try { 155 ActiveMQMessage message = (ActiveMQMessage) queue.dequeue(2000); 156 if (message != null) { 157 client.dispatch(message); 158 if (++count == 250) { 159 count = 0; 160 Thread.yield(); 161 } 162 } 163 } 164 catch (Exception e) { 165 stop(); 166 log.warn("stop dispatching", e); 167 } 168 } 169 } 170 171 private void dispatchToQueue(ActiveMQMessage message, List list) throws JMSException { 172 if (list != null) { 173 int[] ids = new int[list.size()]; 174 for (int i = 0;i < list.size();i++) { 175 TransientTopicSubscription ts = (TransientTopicSubscription) list.get(i); 176 ids[i] = ts.getConsumerInfo().getConsumerNo(); 177 } 178 message.setConsumerNos(ids); 179 try { 180 queue.enqueue(message); 181 } 182 catch (InterruptedException e) { 183 log.warn("queue interuppted, closing", e); 184 close(); 185 } 186 } 187 } 188 189 private TransientTopicSubscription findMatch(ConsumerInfo info) { 190 TransientTopicSubscription result = null; 191 for (Iterator i = subscriptions.iterator();i.hasNext();) { 192 TransientTopicSubscription ts = (TransientTopicSubscription) i.next(); 193 if (ts.getConsumerInfo().equals(info)) { 194 result = ts; 195 break; 196 } 197 } 198 return result; 199 } 200 }