001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * Copyright 2005 Hiram Chirino 005 * 006 * Licensed under the Apache License, Version 2.0 (the "License"); 007 * you may not use this file except in compliance with the License. 008 * You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 * 018 **/ 019 020 package org.activemq.service.boundedvm; 021 import org.activemq.broker.BrokerClient; 022 import org.activemq.filter.Filter; 023 import org.activemq.io.util.MemoryBoundedQueue; 024 import org.activemq.io.util.MemoryBoundedQueueManager; 025 import org.activemq.io.util.MemoryManageable; 026 import org.activemq.message.ActiveMQDestination; 027 import org.activemq.message.ActiveMQMessage; 028 import org.activemq.message.ConsumerInfo; 029 import org.activemq.service.DeadLetterPolicy; 030 import org.activemq.service.MessageContainerAdmin; 031 import org.activemq.service.MessageIdentity; 032 import org.activemq.service.QueueListEntry; 033 import org.activemq.service.RedeliveryPolicy; 034 import org.activemq.service.Service; 035 import org.activemq.service.TransactionManager; 036 import org.activemq.service.TransactionTask; 037 import org.activemq.service.impl.DefaultQueueList; 038 import org.activemq.store.MessageStore; 039 import org.activemq.store.RecoveryListener; 040 import org.apache.commons.logging.Log; 041 import org.apache.commons.logging.LogFactory; 042 043 import EDU.oswego.cs.dl.util.concurrent.Executor; 044 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 045 046 import javax.jms.JMSException; 047 048 import java.util.HashMap; 049 import java.util.List; 050 import java.util.Map; 051 052 /** 053 * A MessageContainer for Durable queues 054 * 055 * @version $Revision: 1.1.1.1 $ 056 */ 057 public class DurableQueueBoundedMessageContainer implements Service, Runnable, MessageContainerAdmin { 058 059 private final MessageStore messageStore; 060 private final MemoryBoundedQueueManager queueManager; 061 private final ActiveMQDestination destination; 062 private final Executor threadPool; 063 private final DeadLetterPolicy deadLetterPolicy; 064 private final Log log; 065 private final MemoryBoundedQueue queue; 066 067 private final DefaultQueueList subscriptions = new DefaultQueueList(); 068 private final SynchronizedBoolean started = new SynchronizedBoolean(false); 069 private final SynchronizedBoolean running = new SynchronizedBoolean(false); 070 private final Object dispatchMutex = new Object(); 071 private final Object subscriptionsMutex = new Object(); 072 073 private long idleTimestamp; //length of time (ms) there have been no active subscribers 074 075 /** 076 * Construct this beast 077 * 078 * @param threadPool 079 * @param queueManager 080 * @param destination 081 * @param redeliveryPolicy 082 * @param deadLetterPolicy 083 */ 084 public DurableQueueBoundedMessageContainer(MessageStore messageStore, Executor threadPool, MemoryBoundedQueueManager queueManager, 085 ActiveMQDestination destination,RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) { 086 this.messageStore = messageStore; 087 this.threadPool = threadPool; 088 this.queueManager = queueManager; 089 this.destination = destination; 090 this.deadLetterPolicy = deadLetterPolicy; 091 092 this.queue = queueManager.getMemoryBoundedQueue("DURABLE_QUEUE:-" + destination.getPhysicalName()); 093 this.log = LogFactory.getLog("DurableQueueBoundedMessageContainer:- " + destination); 094 } 095 096 097 /** 098 * @return true if there are subscribers waiting for messages 099 */ 100 public boolean isActive(){ 101 return !subscriptions.isEmpty(); 102 } 103 104 /** 105 * @return true if no messages are enqueued 106 */ 107 public boolean isEmpty(){ 108 return queue.isEmpty(); 109 } 110 111 /** 112 * @return the timestamp (ms) from the when the last active subscriber stopped 113 */ 114 public long getIdleTimestamp(){ 115 return idleTimestamp; 116 } 117 118 119 120 /** 121 * Add a consumer to dispatch messages to 122 * 123 * @param filter 124 * @param info 125 * @param client 126 * @return DurableQueueSubscription 127 * @throws JMSException 128 */ 129 public DurableQueueSubscription addConsumer(Filter filter, ConsumerInfo info, BrokerClient client) 130 throws JMSException { 131 DurableQueueSubscription ts = findMatch(info); 132 if (ts == null) { 133 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue("DURABLE_SUB:-"+info.getConsumerId()); 134 MemoryBoundedQueue ackQueue = queueManager.getMemoryBoundedQueue("DURABLE_SUB_ACKED:-"+info.getConsumerId()); 135 ts = new DurableQueueSubscription(client, queue, ackQueue, filter, info); 136 synchronized (subscriptionsMutex) { 137 idleTimestamp = 0; 138 subscriptions.add(ts); 139 checkRunning(); 140 } 141 } 142 return ts; 143 } 144 145 /** 146 * Remove a consumer 147 * 148 * @param info 149 * @throws JMSException 150 */ 151 public void removeConsumer(ConsumerInfo info) throws JMSException { 152 synchronized (subscriptionsMutex) { 153 DurableQueueSubscription ts = findMatch(info); 154 if (ts != null) { 155 156 subscriptions.remove(ts); 157 if (subscriptions.isEmpty()) { 158 running.commit(true, false); 159 idleTimestamp = System.currentTimeMillis(); 160 } 161 162 // get unacknowledged messages and re-enqueue them 163 List list = ts.getUndeliveredMessages(); 164 for (int i = list.size() - 1; i >= 0; i--) { 165 queue.enqueueFirstNoBlock((MemoryManageable) list.get(i)); 166 } 167 168 // If it is a queue browser, then re-enqueue the browsed 169 // messages. 170 if (ts.isBrowser()) { 171 list = ts.listAckedMessages(); 172 for (int i = list.size() - 1; i >= 0; i--) { 173 queue.enqueueFirstNoBlock((MemoryManageable) list 174 .get(i)); 175 } 176 ts.removeAllAckedMessages(); 177 } 178 179 ts.close(); 180 } 181 } 182 } 183 184 /** 185 * start working 186 * 187 * @throws JMSException 188 */ 189 public void start() throws JMSException { 190 if (started.commit(false, true)) { 191 messageStore.start(); 192 messageStore.recover(new RecoveryListener() { 193 public void recoverMessage(MessageIdentity messageIdentity) throws JMSException { 194 recoverMessageToBeDelivered(messageIdentity); 195 } 196 }); 197 checkRunning(); 198 } 199 } 200 201 private void recoverMessageToBeDelivered(MessageIdentity msgId) throws JMSException { 202 DurableMessagePointer pointer = new DurableMessagePointer(messageStore, getDestination(), messageStore.getMessage(msgId)); 203 queue.enqueue(pointer); 204 } 205 206 /** 207 * enqueue a message for dispatching 208 * 209 * @param message 210 * @throws JMSException 211 */ 212 public void enqueue(final ActiveMQMessage message) throws JMSException { 213 final DurableMessagePointer pointer = new DurableMessagePointer(messageStore, getDestination(), message); 214 if (message.isAdvisory()) { 215 doAdvisoryDispatchMessage(pointer); 216 } 217 else { 218 messageStore.addMessage(message); 219 // If there is no transaction.. then this executes directly. 220 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){ 221 public void execute() throws Throwable { 222 queue.enqueue(pointer); 223 checkRunning(); 224 } 225 }); 226 } 227 } 228 229 public void redeliver(DurableMessagePointer message) { 230 queue.enqueueFirstNoBlock(message); 231 checkRunning(); 232 } 233 234 public void redeliver(List messages) { 235 queue.enqueueAllFirstNoBlock(messages); 236 checkRunning(); 237 } 238 239 /** 240 * stop working 241 */ 242 public void stop() { 243 started.set(false); 244 running.set(false); 245 queue.clear(); 246 } 247 248 /** 249 * close down this container 250 * 251 * @throws JMSException 252 */ 253 public void close() throws JMSException { 254 if (started.get()) { 255 stop(); 256 } 257 synchronized(subscriptionsMutex){ 258 QueueListEntry entry = subscriptions.getFirstEntry(); 259 while (entry != null) { 260 DurableQueueSubscription ts = (DurableQueueSubscription) entry.getElement(); 261 ts.close(); 262 entry = subscriptions.getNextEntry(entry); 263 } 264 subscriptions.clear(); 265 } 266 } 267 268 /** 269 * do some dispatching 270 */ 271 public void run() { 272 // Only allow one thread at a time to dispatch. 273 synchronized (dispatchMutex) { 274 boolean dispatched = false; 275 boolean targeted = false; 276 DurableMessagePointer messagePointer = null; 277 int notDispatchedCount = 0; 278 int sleepTime = 250; 279 int iterationsWithoutDispatchingBeforeStopping = 10000 / sleepTime;// ~10 280 // seconds 281 Map messageParts = new HashMap(); 282 try { 283 while (started.get() && running.get()) { 284 dispatched = false; 285 targeted = false; 286 synchronized (subscriptionsMutex) { 287 if (!subscriptions.isEmpty()) { 288 messagePointer = (DurableMessagePointer) queue 289 .dequeue(sleepTime); 290 if (messagePointer != null) { 291 ActiveMQMessage message = messagePointer 292 .getMessage(); 293 if (!message.isExpired()) { 294 295 QueueListEntry entry = subscriptions 296 .getFirstEntry(); 297 while (entry != null) { 298 DurableQueueSubscription ts = (DurableQueueSubscription) entry 299 .getElement(); 300 if (ts.isTarget(message)) { 301 targeted = true; 302 if (message.isMessagePart()) { 303 DurableQueueSubscription sameTarget = (DurableQueueSubscription) messageParts 304 .get(message 305 .getParentMessageID()); 306 if (sameTarget == null) { 307 sameTarget = ts; 308 messageParts 309 .put( 310 message 311 .getParentMessageID(), 312 sameTarget); 313 } 314 sameTarget 315 .doDispatch(messagePointer); 316 if (message.isLastMessagePart()) { 317 messageParts 318 .remove(message 319 .getParentMessageID()); 320 } 321 messagePointer = null; 322 dispatched = true; 323 notDispatchedCount = 0; 324 break; 325 } else if (ts.canAcceptMessages()) { 326 ts.doDispatch(messagePointer); 327 messagePointer = null; 328 dispatched = true; 329 notDispatchedCount = 0; 330 subscriptions.rotate(); 331 break; 332 } 333 } 334 entry = subscriptions 335 .getNextEntry(entry); 336 } 337 338 } else { 339 // expire message 340 if (log.isDebugEnabled()) { 341 log.debug("expired message: " 342 + messagePointer); 343 } 344 if (deadLetterPolicy != null) { 345 deadLetterPolicy 346 .sendToDeadLetter(messagePointer 347 .getMessage()); 348 } 349 messagePointer = null; 350 } 351 } 352 } 353 } 354 if (!dispatched) { 355 if (messagePointer != null) { 356 if (targeted) { 357 queue.enqueueFirstNoBlock(messagePointer); 358 } else { 359 //no matching subscribers - dump to end and hope one shows up ... 360 queue.enqueueNoBlock(messagePointer); 361 362 } 363 } 364 if (running.get()) { 365 if (notDispatchedCount++ > iterationsWithoutDispatchingBeforeStopping 366 && queue.isEmpty()) { 367 synchronized (running) { 368 running.commit(true, false); 369 } 370 } else { 371 Thread.sleep(sleepTime); 372 } 373 } 374 } 375 } 376 } catch (InterruptedException ie) { 377 //someone is stopping us from another thread 378 } catch (Throwable e) { 379 log.warn("stop dispatching", e); 380 stop(); 381 } 382 } 383 } 384 385 private DurableQueueSubscription findMatch(ConsumerInfo info) throws JMSException { 386 DurableQueueSubscription result = null; 387 synchronized (subscriptionsMutex) { 388 QueueListEntry entry = subscriptions.getFirstEntry(); 389 while (entry != null) { 390 DurableQueueSubscription ts = (DurableQueueSubscription) entry 391 .getElement(); 392 if (ts.getConsumerInfo().equals(info)) { 393 result = ts; 394 break; 395 } 396 entry = subscriptions.getNextEntry(entry); 397 } 398 } 399 return result; 400 } 401 402 /** 403 * @return the destination associated with this container 404 */ 405 public ActiveMQDestination getDestination() { 406 return destination; 407 } 408 409 /** 410 * @return the destination name 411 */ 412 public String getDestinationName() { 413 return destination.getPhysicalName(); 414 } 415 416 protected void clear() { 417 queue.clear(); 418 } 419 420 protected void removeExpiredMessages() { 421 long currentTime = System.currentTimeMillis(); 422 List list = queue.getContents(); 423 for (int i = 0;i < list.size();i++) { 424 DurableMessagePointer msgPointer = (DurableMessagePointer) list.get(i); 425 ActiveMQMessage message = msgPointer.getMessage(); 426 if (message.isExpired(currentTime)) { 427 // TODO: remove message from message store. 428 queue.remove(msgPointer); 429 if (log.isDebugEnabled()) { 430 log.debug("expired message: " + msgPointer); 431 } 432 } 433 } 434 } 435 436 protected void checkRunning(){ 437 if (!running.get() && started.get() && !subscriptions.isEmpty()) { 438 synchronized (running) { 439 if (running.commit(false, true)) { 440 try { 441 threadPool.execute(this); 442 } 443 catch (InterruptedException e) { 444 log.error(this + " Couldn't start executing ",e); 445 } 446 } 447 } 448 } 449 } 450 451 452 /** 453 * @see org.activemq.service.MessageContainer#getMessageContainerAdmin() 454 */ 455 public MessageContainerAdmin getMessageContainerAdmin() { 456 return this; 457 } 458 459 /** 460 * @see org.activemq.service.MessageContainerAdmin#empty() 461 */ 462 public void empty() throws JMSException { 463 if( subscriptions.isEmpty() ) { 464 messageStore.removeAllMessages(); 465 queue.clear(); 466 } else { 467 throw new JMSException("Cannot empty a queue while it is use."); 468 } 469 } 470 471 /** 472 * Dispatch an Advisory Message 473 * @param messagePointer 474 */ 475 private synchronized void doAdvisoryDispatchMessage(DurableMessagePointer messagePointer) { 476 ActiveMQMessage message = messagePointer.getMessage(); 477 try { 478 479 if (message.isAdvisory() && !message.isExpired()) { 480 synchronized (subscriptionsMutex) { 481 QueueListEntry entry = subscriptions.getFirstEntry(); 482 while (entry != null) { 483 DurableQueueSubscription ts = (DurableQueueSubscription) entry 484 .getElement(); 485 if (ts.isTarget(message)) { 486 ts.doDispatch(messagePointer); 487 break; 488 } 489 entry = subscriptions.getNextEntry(entry); 490 } 491 } 492 } 493 } catch (JMSException jmsEx) { 494 log.warn("Failed to dispatch advisory", jmsEx); 495 } 496 } 497 498 }