001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * Copyright 2004 Hiram Chirino 005 * 006 * Licensed under the Apache License, Version 2.0 (the "License"); 007 * you may not use this file except in compliance with the License. 008 * You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 * 018 **/ 019 020 package org.activemq.io.util; 021 import java.util.ArrayList; 022 import java.util.Iterator; 023 import java.util.List; 024 025 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 026 027 /** 028 * A factory manager for MemoryBoundedQueue and also ensures that the maximum memory used by all active 029 * MemoryBoundedQueues created by this instance stays within the memory usage bounds set. 030 * 031 * @version $Revision: 1.1.1.1 $ 032 */ 033 public class MemoryBoundedQueueManager { 034 035 private final ConcurrentHashMap activeQueues = new ConcurrentHashMap(); 036 private final MemoryBoundedObjectManager memoryManager; 037 038 /** 039 * @param name 040 * @param maxSize 041 */ 042 public MemoryBoundedQueueManager(MemoryBoundedObjectManager memoryManager) { 043 this.memoryManager = memoryManager; 044 } 045 046 /** 047 * retrieve a named MemoryBoundedQueue or creates one if not found 048 * 049 * @param name 050 * @return an named instance of a MemoryBoundedQueue 051 */ 052 public MemoryBoundedQueue getMemoryBoundedQueue(String name) { 053 MemoryBoundedQueue result = (MemoryBoundedQueue) activeQueues.get(name); 054 if (result == null) { 055 if (memoryManager.isSupportJMSPriority()) 056 result = new MemoryBoundedPrioritizedQueue(this, name); 057 else 058 result = new MemoryBoundedQueue(this, name); 059 activeQueues.put(name, result); 060 } 061 return result; 062 } 063 064 /** 065 * close this queue manager and all associated MemoryBoundedQueues 066 */ 067 public void close() { 068 memoryManager.close(); 069 } 070 071 /** 072 * @return Returns the memoryManager. 073 */ 074 public MemoryBoundedObjectManager getMemoryManager() { 075 return memoryManager; 076 } 077 078 public int getCurrentCapacity() { 079 return memoryManager.getCurrentCapacity(); 080 } 081 082 public void add(MemoryBoundedQueue queue) { 083 memoryManager.add(queue); 084 } 085 086 public void remove(MemoryBoundedQueue queue) { 087 memoryManager.remove(queue); 088 activeQueues.remove(queue.getName()); 089 } 090 091 public boolean isFull() { 092 return memoryManager.isFull(); 093 } 094 095 public void incrementMemoryUsed(int size) { 096 memoryManager.incrementMemoryUsed(size); 097 } 098 099 public void decrementMemoryUsed(int size) { 100 memoryManager.decrementMemoryUsed(size); 101 } 102 103 public List getMemoryBoundedQueues(){ 104 return new ArrayList(activeQueues.values()); 105 } 106 107 public String dumpContents(){ 108 String result = "Memory = " + memoryManager.getTotalMemoryUsedSize() + " , capacity = " + memoryManager.getCurrentCapacity() + "\n"; 109 for (Iterator i = activeQueues.values().iterator(); i.hasNext(); ){ 110 MemoryBoundedQueue q = (MemoryBoundedQueue)i.next(); 111 result += "\t" + q.getName() + " enqueued = " + q.getContents().size() + " memory = " + q.getLocalMemoryUsedByThisQueue() + "\n"; 112 } 113 result += "\n\n"; 114 return result; 115 } 116 117 }