package org.activemq.service.boundedvm;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.jms.JMSException;
import org.activemq.broker.BrokerClient;
import org.activemq.filter.AndFilter;
import org.activemq.filter.DestinationMap;
import org.activemq.filter.Filter;
import org.activemq.filter.FilterFactory;
import org.activemq.filter.FilterFactoryImpl;
import org.activemq.filter.NoLocalFilter;
import org.activemq.message.ActiveMQDestination;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.ActiveMQQueue;
import org.activemq.message.ConsumerInfo;
import org.activemq.message.MessageAck;
import org.activemq.message.util.MemoryBoundedQueueManager;
import org.activemq.service.DeadLetterPolicy;
import org.activemq.service.MessageContainer;
import org.activemq.service.MessageContainerManager;
import org.activemq.service.RedeliveryPolicy;
import org.activemq.service.TransactionManager;
import org.activemq.service.TransactionTask;
import org.activemq.store.PersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/activemq/service/boundedvm/DurableQueueBoundedMessageManager.class */
public class DurableQueueBoundedMessageManager implements MessageContainerManager, Runnable {
    private static final int DEFAULT_GARBAGE_COLLECTION_CAPACITY_LIMIT = 10;
    private static final long DEFAULT_INACTIVE_TIMEOUT = 30000;
    private static final Log log;
    private MemoryBoundedQueueManager queueManager;
    private long inactiveTimeout;
    private int garbageCoolectionCapacityLimit;
    private RedeliveryPolicy redeliveryPolicy;
    private DeadLetterPolicy deadLetterPolicy;
    private final PersistenceAdapter persistenceAdapter;
    static Class class$org$activemq$service$boundedvm$DurableQueueBoundedMessageManager;
    private ConcurrentHashMap containers = new ConcurrentHashMap();
    private DestinationMap destinationMap = new DestinationMap();
    private Map destinations = new ConcurrentHashMap();
    private ConcurrentHashMap subscriptions = new ConcurrentHashMap();
    private FilterFactory filterFactory = new FilterFactoryImpl();
    private SynchronizedBoolean started = new SynchronizedBoolean(false);
    private SynchronizedBoolean doingGarbageCollection = new SynchronizedBoolean(false);
    private PooledExecutor threadPool = new PooledExecutor();

    /* loaded from: input_file:org/activemq/service/boundedvm/DurableQueueBoundedMessageManager$DurableQueueThreadFactory.class */
    protected static class DurableQueueThreadFactory implements ThreadFactory {
        protected DurableQueueThreadFactory() {
        }

        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, "Durable Queue Worker");
            thread.setPriority(6);
            thread.setDaemon(true);
            return thread;
        }
    }

    public DurableQueueBoundedMessageManager(PersistenceAdapter persistenceAdapter, MemoryBoundedQueueManager memoryBoundedQueueManager, RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) {
        this.persistenceAdapter = persistenceAdapter;
        this.queueManager = memoryBoundedQueueManager;
        this.redeliveryPolicy = redeliveryPolicy;
        this.deadLetterPolicy = deadLetterPolicy;
        this.threadPool.setThreadFactory(new DurableQueueThreadFactory());
        this.inactiveTimeout = DEFAULT_INACTIVE_TIMEOUT;
        this.garbageCoolectionCapacityLimit = 10;
    }

    public int getGarbageCoolectionCapacityLimit() {
        return this.garbageCoolectionCapacityLimit;
    }

    public void setGarbageCoolectionCapacityLimit(int i) {
        this.garbageCoolectionCapacityLimit = i;
    }

    public long getInactiveTimeout() {
        return this.inactiveTimeout;
    }

    public void setInactiveTimeout(long j) {
        this.inactiveTimeout = j;
    }

    @Override // org.activemq.service.Service
    public void start() throws JMSException {
        if (this.started.commit(false, true)) {
            Iterator it = this.containers.values().iterator();
            while (it.hasNext()) {
                ((DurableQueueBoundedMessageContainer) it.next()).start();
            }
            try {
                this.threadPool.execute(this);
            } catch (InterruptedException e) {
                JMSException jMSException = new JMSException("Garbage collection interupted on start()");
                jMSException.setLinkedException(e);
                throw jMSException;
            }
        }
    }

    @Override // org.activemq.service.Service
    public void stop() throws JMSException {
        if (this.started.commit(true, false)) {
            Iterator it = this.containers.values().iterator();
            while (it.hasNext()) {
                ((DurableQueueBoundedMessageContainer) it.next()).stop();
            }
            this.threadPool.interruptAll();
            this.threadPool.shutdownNow();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.started.get()) {
            doGarbageCollection();
            try {
                Thread.sleep(2000L);
            } catch (InterruptedException e) {
            }
        }
    }

    @Override // org.activemq.service.MessageContainerManager
    public synchronized void addMessageConsumer(BrokerClient brokerClient, ConsumerInfo consumerInfo) throws JMSException {
        ActiveMQDestination destination = consumerInfo.getDestination();
        if (!destination.isQueue() || destination.isTemporary()) {
            return;
        }
        DurableQueueBoundedMessageContainer durableQueueBoundedMessageContainer = (DurableQueueBoundedMessageContainer) this.containers.get(destination);
        if (durableQueueBoundedMessageContainer == null) {
            durableQueueBoundedMessageContainer = createContainer(destination, false);
        }
        if (log.isDebugEnabled()) {
            log.debug(new StringBuffer().append("Adding consumer: ").append(consumerInfo).toString());
        }
        DurableQueueSubscription addConsumer = durableQueueBoundedMessageContainer.addConsumer(createFilter(consumerInfo), consumerInfo, brokerClient);
        if (addConsumer != null) {
            this.subscriptions.put(consumerInfo.getConsumerId(), addConsumer);
        }
        String physicalName = destination.getPhysicalName();
        if (this.destinations.containsKey(physicalName)) {
            return;
        }
        this.destinations.put(physicalName, destination);
    }

    private DurableQueueBoundedMessageContainer createContainer(ActiveMQDestination activeMQDestination, boolean z) throws JMSException {
        DurableQueueBoundedMessageContainer durableQueueBoundedMessageContainer = new DurableQueueBoundedMessageContainer(this.persistenceAdapter.createQueueMessageStore(activeMQDestination.getPhysicalName()), this.threadPool, this.queueManager, activeMQDestination, z ? null : this.redeliveryPolicy, this.deadLetterPolicy);
        addContainer(durableQueueBoundedMessageContainer);
        if (this.started.get()) {
            durableQueueBoundedMessageContainer.start();
        }
        return durableQueueBoundedMessageContainer;
    }

    @Override // org.activemq.service.MessageContainerManager
    public synchronized void removeMessageConsumer(BrokerClient brokerClient, ConsumerInfo consumerInfo) throws JMSException {
        ActiveMQDestination destination = consumerInfo.getDestination();
        if (!destination.isQueue() || destination.isTemporary()) {
            return;
        }
        for (DurableQueueBoundedMessageContainer durableQueueBoundedMessageContainer : this.containers.values()) {
            if (durableQueueBoundedMessageContainer != null) {
                durableQueueBoundedMessageContainer.removeConsumer(consumerInfo);
            }
        }
        this.subscriptions.remove(consumerInfo.getConsumerId());
    }

    @Override // org.activemq.service.MessageContainerManager
    public void deleteSubscription(String str, String str2) throws JMSException {
    }

    @Override // org.activemq.service.MessageContainerManager
    public void sendMessage(BrokerClient brokerClient, ActiveMQMessage activeMQMessage) throws JMSException {
        ActiveMQDestination jMSActiveMQDestination = activeMQMessage.getJMSActiveMQDestination();
        if (jMSActiveMQDestination.isQueue() && !jMSActiveMQDestination.isTemporary() && activeMQMessage.isPersistent()) {
            if (this.queueManager.getCurrentCapacity() <= this.garbageCoolectionCapacityLimit) {
                doGarbageCollection();
            }
            if (((DurableQueueBoundedMessageContainer) this.containers.get(jMSActiveMQDestination)) == null) {
                createContainer(jMSActiveMQDestination, false);
            }
            Iterator it = this.destinationMap.get(activeMQMessage.getJMSActiveMQDestination()).iterator();
            while (it.hasNext()) {
                ((DurableQueueBoundedMessageContainer) it.next()).enqueue(activeMQMessage);
            }
        }
    }

    @Override // org.activemq.service.MessageContainerManager
    public void acknowledgeMessage(BrokerClient brokerClient, MessageAck messageAck) throws JMSException {
        DurableQueueSubscription durableQueueSubscription;
        DurableMessagePointer acknowledgeMessage;
        ActiveMQDestination destination = messageAck.getDestination();
        if (!destination.isQueue() || destination.isTemporary() || !messageAck.isPersistent() || (durableQueueSubscription = (DurableQueueSubscription) this.subscriptions.get(messageAck.getConsumerId())) == null || (acknowledgeMessage = durableQueueSubscription.acknowledgeMessage(messageAck.getMessageID())) == null) {
            return;
        }
        if (durableQueueSubscription.isBrowser()) {
            durableQueueSubscription.addAckedMessage(acknowledgeMessage);
            return;
        }
        if (!messageAck.isMessageRead() || messageAck.isExpired()) {
            redeliverMessage(durableQueueSubscription, messageAck, acknowledgeMessage);
            return;
        }
        acknowledgeMessage.getMessageStore().removeMessage(messageAck);
        if (TransactionManager.isCurrentTransaction()) {
            if (!durableQueueSubscription.hasAckedMessage()) {
                TransactionManager.getContexTransaction().addPostRollbackTask(new TransactionTask(this, durableQueueSubscription, messageAck) { // from class: org.activemq.service.boundedvm.DurableQueueBoundedMessageManager.1
                    private final DurableQueueSubscription val$ts;
                    private final MessageAck val$ack;
                    private final DurableQueueBoundedMessageManager this$0;

                    {
                        this.this$0 = this;
                        this.val$ts = durableQueueSubscription;
                        this.val$ack = messageAck;
                    }

                    @Override // org.activemq.service.TransactionTask
                    public void execute() throws Throwable {
                        List<DurableMessagePointer> listAckedMessages = this.val$ts.listAckedMessages();
                        HashMap hashMap = new HashMap();
                        for (DurableMessagePointer durableMessagePointer : listAckedMessages) {
                            ActiveMQMessage message = durableMessagePointer.getMessage();
                            message.setJMSRedelivered(true);
                            if (message.incrementDeliveryCount() >= this.this$0.redeliveryPolicy.getMaximumRetryCount()) {
                                if (DurableQueueBoundedMessageManager.log.isDebugEnabled()) {
                                    DurableQueueBoundedMessageManager.log.debug(new StringBuffer().append("Message: ").append(message).append(" has exceeded its retry count").toString());
                                }
                                this.this$0.deadLetterPolicy.sendToDeadLetter(message);
                            } else if (this.val$ack.isExpired()) {
                                if (DurableQueueBoundedMessageManager.log.isDebugEnabled()) {
                                    DurableQueueBoundedMessageManager.log.debug(new StringBuffer().append("Message: ").append(message).append(" has expired").toString());
                                }
                                this.this$0.deadLetterPolicy.sendToDeadLetter(message);
                            } else {
                                for (DurableQueueBoundedMessageContainer durableQueueBoundedMessageContainer : this.this$0.destinationMap.get(message.getJMSActiveMQDestination())) {
                                    LinkedList linkedList = (LinkedList) hashMap.get(durableQueueBoundedMessageContainer);
                                    if (linkedList == null) {
                                        linkedList = new LinkedList();
                                        hashMap.put(durableQueueBoundedMessageContainer, linkedList);
                                    }
                                    linkedList.addFirst(durableMessagePointer);
                                }
                            }
                        }
                        for (DurableQueueBoundedMessageContainer durableQueueBoundedMessageContainer2 : hashMap.keySet()) {
                            durableQueueBoundedMessageContainer2.redeliver((List) hashMap.get(durableQueueBoundedMessageContainer2));
                        }
                        this.val$ts.removeAllAckedMessages();
                    }
                });
                TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(this, durableQueueSubscription) { // from class: org.activemq.service.boundedvm.DurableQueueBoundedMessageManager.2
                    private final DurableQueueSubscription val$ts;
                    private final DurableQueueBoundedMessageManager this$0;

                    {
                        this.this$0 = this;
                        this.val$ts = durableQueueSubscription;
                    }

                    @Override // org.activemq.service.TransactionTask
                    public void execute() throws Throwable {
                        this.val$ts.removeAllAckedMessages();
                    }
                });
            }
            durableQueueSubscription.addAckedMessage(acknowledgeMessage);
        }
    }

    private void redeliverMessage(DurableQueueSubscription durableQueueSubscription, MessageAck messageAck, DurableMessagePointer durableMessagePointer) throws JMSException {
        durableMessagePointer.getMessage().setJMSRedelivered(true);
        if (durableMessagePointer.incrementDeliveryCount() >= this.redeliveryPolicy.getMaximumRetryCount()) {
            if (log.isDebugEnabled()) {
                log.debug(new StringBuffer().append("Message: ").append(durableMessagePointer).append(" has exceeded its retry count").toString());
            }
            this.deadLetterPolicy.sendToDeadLetter(durableMessagePointer.getMessage());
        } else if (messageAck.isExpired()) {
            if (log.isDebugEnabled()) {
                log.debug(new StringBuffer().append("Message: ").append(durableMessagePointer).append(" has expired").toString());
            }
            this.deadLetterPolicy.sendToDeadLetter(durableMessagePointer.getMessage());
        } else {
            Iterator it = this.destinationMap.get(durableMessagePointer.getMessage().getJMSActiveMQDestination()).iterator();
            if (it.hasNext()) {
                ((DurableQueueBoundedMessageContainer) it.next()).redeliver(durableMessagePointer);
            }
        }
    }

    @Override // org.activemq.service.MessageContainerManager
    public void poll() throws JMSException {
    }

    @Override // org.activemq.service.MessageContainerManager
    public MessageContainer getContainer(String str) throws JMSException {
        ActiveMQDestination activeMQDestination = (ActiveMQDestination) this.destinations.get(str);
        if (activeMQDestination != null) {
            return (MessageContainer) this.containers.get(activeMQDestination);
        }
        return null;
    }

    @Override // org.activemq.service.MessageContainerManager
    public Map getDestinations() {
        return Collections.unmodifiableMap(this.containers);
    }

    @Override // org.activemq.service.MessageContainerManager
    public Map getLocalDestinations() {
        HashMap hashMap = new HashMap();
        for (DurableSubscription durableSubscription : this.subscriptions.values()) {
            if (durableSubscription.isLocalSubscription()) {
                ActiveMQDestination destination = durableSubscription.getDestination();
                hashMap.put(destination.getPhysicalName(), destination);
            }
        }
        return Collections.unmodifiableMap(hashMap);
    }

    @Override // org.activemq.service.MessageContainerManager
    public DeadLetterPolicy getDeadLetterPolicy() {
        return this.deadLetterPolicy;
    }

    @Override // org.activemq.service.MessageContainerManager
    public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy) {
        this.deadLetterPolicy = deadLetterPolicy;
    }

    protected Filter createFilter(ConsumerInfo consumerInfo) throws JMSException {
        Filter createFilter = this.filterFactory.createFilter(consumerInfo.getDestination(), consumerInfo.getSelector());
        if (consumerInfo.isNoLocal()) {
            createFilter = new AndFilter(createFilter, new NoLocalFilter(consumerInfo.getClientId()));
        }
        return createFilter;
    }

    private void doGarbageCollection() {
        if (this.doingGarbageCollection.commit(true, false)) {
            if (this.queueManager.getCurrentCapacity() <= this.garbageCoolectionCapacityLimit) {
                Iterator it = this.containers.values().iterator();
                while (it.hasNext()) {
                    ((DurableQueueBoundedMessageContainer) it.next()).removeExpiredMessages();
                }
            }
            if (this.queueManager.getCurrentCapacity() <= this.garbageCoolectionCapacityLimit) {
                for (DurableQueueBoundedMessageContainer durableQueueBoundedMessageContainer : this.containers.values()) {
                    if (!durableQueueBoundedMessageContainer.isActive() && durableQueueBoundedMessageContainer.getIdleTimestamp() < System.currentTimeMillis() - this.inactiveTimeout) {
                        removeContainer(durableQueueBoundedMessageContainer);
                        log.warn(new StringBuffer().append("memory limit low - forced to remove inactive and idle queue: ").append(durableQueueBoundedMessageContainer.getDestinationName()).toString());
                    }
                }
            }
            for (DurableQueueBoundedMessageContainer durableQueueBoundedMessageContainer2 : this.containers.values()) {
                if (!durableQueueBoundedMessageContainer2.isActive() && !durableQueueBoundedMessageContainer2.isEmpty()) {
                    removeContainer(durableQueueBoundedMessageContainer2);
                }
            }
            this.doingGarbageCollection.set(false);
        }
    }

    private synchronized void addContainer(DurableQueueBoundedMessageContainer durableQueueBoundedMessageContainer) {
        this.containers.put(durableQueueBoundedMessageContainer.getDestination(), durableQueueBoundedMessageContainer);
        this.destinationMap.put(durableQueueBoundedMessageContainer.getDestination(), durableQueueBoundedMessageContainer);
    }

    private synchronized void removeContainer(DurableQueueBoundedMessageContainer durableQueueBoundedMessageContainer) {
        try {
            durableQueueBoundedMessageContainer.close();
            log.info(new StringBuffer().append("closed inactive Durable queue container: ").append(durableQueueBoundedMessageContainer.getDestinationName()).toString());
        } catch (JMSException e) {
            log.warn("failure closing container", e);
        }
        this.containers.remove(durableQueueBoundedMessageContainer.getDestination());
        this.destinationMap.remove(durableQueueBoundedMessageContainer.getDestination(), durableQueueBoundedMessageContainer);
    }

    protected Executor getThreadPool() {
        return this.threadPool;
    }

    @Override // org.activemq.service.MessageContainerManager
    public void createMessageContainer(ActiveMQDestination activeMQDestination) throws JMSException {
        createContainer(activeMQDestination, false);
    }

    @Override // org.activemq.service.MessageContainerManager
    public Map getMessageContainerAdmins() throws JMSException {
        return Collections.unmodifiableMap(this.containers);
    }

    @Override // org.activemq.service.MessageContainerManager
    public void destroyMessageContainer(ActiveMQDestination activeMQDestination) throws JMSException {
        if (activeMQDestination.isQueue()) {
            DurableQueueBoundedMessageContainer durableQueueBoundedMessageContainer = (DurableQueueBoundedMessageContainer) this.containers.remove(activeMQDestination);
            if (durableQueueBoundedMessageContainer != null) {
                durableQueueBoundedMessageContainer.empty();
                durableQueueBoundedMessageContainer.stop();
            }
            this.destinationMap.removeAll(activeMQDestination);
        }
    }

    public void sendToDeadLetterQueue(String str, ActiveMQMessage activeMQMessage) throws JMSException {
        ActiveMQQueue activeMQQueue = new ActiveMQQueue(str);
        DurableQueueBoundedMessageContainer durableQueueBoundedMessageContainer = (DurableQueueBoundedMessageContainer) this.containers.get(activeMQQueue);
        if (durableQueueBoundedMessageContainer == null) {
            durableQueueBoundedMessageContainer = createContainer(activeMQQueue, true);
        }
        durableQueueBoundedMessageContainer.enqueue(activeMQMessage);
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$activemq$service$boundedvm$DurableQueueBoundedMessageManager == null) {
            cls = class$("org.activemq.service.boundedvm.DurableQueueBoundedMessageManager");
            class$org$activemq$service$boundedvm$DurableQueueBoundedMessageManager = cls;
        } else {
            cls = class$org$activemq$service$boundedvm$DurableQueueBoundedMessageManager;
        }
        log = LogFactory.getLog(cls);
    }
}
