package org.codehaus.activemq.service.vm;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import java.util.Iterator;
import java.util.Map;
import javax.jms.JMSException;
import org.codehaus.activemq.broker.ActiveMQBrokerClient;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.router.filter.AndFilter;
import org.codehaus.activemq.router.filter.Filter;
import org.codehaus.activemq.router.filter.FilterFactory;
import org.codehaus.activemq.router.filter.FilterFactoryImpl;
import org.codehaus.activemq.router.filter.NoLocalFilter;
import org.codehaus.activemq.service.Dispatcher;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.MessageContainerManager;
import org.codehaus.activemq.service.Subscription;
import org.codehaus.activemq.service.vm.util.VMLinkedList;
import org.codehaus.activemq.service.vm.util.VMLinkedListEntry;

/* loaded from: input_file:org/codehaus/activemq/service/vm/VMQueueMessageContainerManager.class */
public class VMQueueMessageContainerManager implements MessageContainerManager {
    private static final int MAX_MESSAGES_DISPATCHED_FROM_POLL = 50;
    protected FilterFactory filterFactory = new FilterFactoryImpl();
    protected ConcurrentHashMap subscriptions = new ConcurrentHashMap();
    protected ConcurrentHashMap activeSubscriptions = new ConcurrentHashMap();
    protected Map messageContainers = new ConcurrentHashMap();
    protected Dispatcher dispatcher = new VMDispatcher();

    /* JADX INFO: Access modifiers changed from: package-private */
    public VMQueueMessageContainerManager() {
        this.dispatcher.register(this);
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void addMessageConsumer(ActiveMQBrokerClient activeMQBrokerClient, ConsumerInfo consumerInfo) throws JMSException {
        if (consumerInfo.getDestination().isQueue()) {
            VMSubscription vMSubscription = new VMSubscription(this.dispatcher, consumerInfo, createFilter(consumerInfo));
            this.subscriptions.put(consumerInfo.getConsumerId(), vMSubscription);
            vMSubscription.setActive(true);
            this.dispatcher.addActiveSubscription(activeMQBrokerClient, vMSubscription);
            updateActiveSubscriptions(vMSubscription);
        }
    }

    private void updateActiveSubscriptions(Subscription subscription) throws JMSException {
        Iterator it = this.messageContainers.values().iterator();
        while (it.hasNext()) {
            updateActiveSubscriptions((MessageContainer) it.next(), subscription);
        }
    }

    private void updateActiveSubscriptions(MessageContainer messageContainer, Subscription subscription) throws JMSException {
        if (messageContainer.getDestinationName().equals(subscription.getDestination().toString())) {
            messageContainer.reset();
            VMLinkedList subscriptionList = getSubscriptionList(messageContainer);
            if (subscriptionList.contains(subscription)) {
                return;
            }
            subscriptionList.add(subscription);
        }
    }

    private VMLinkedList getSubscriptionList(MessageContainer messageContainer) {
        VMLinkedList vMLinkedList = (VMLinkedList) this.activeSubscriptions.get(messageContainer);
        if (vMLinkedList == null) {
            vMLinkedList = new VMLinkedList();
            this.activeSubscriptions.put(messageContainer, vMLinkedList);
        }
        return vMLinkedList;
    }

    protected Filter createFilter(ConsumerInfo consumerInfo) throws JMSException {
        Filter createFilter = this.filterFactory.createFilter(consumerInfo.getDestination(), consumerInfo.getSelector());
        if (consumerInfo.isNoLocal()) {
            createFilter = new AndFilter(createFilter, new NoLocalFilter(consumerInfo.getClientId()));
        }
        return createFilter;
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void removeMessageConsumer(ActiveMQBrokerClient activeMQBrokerClient, ConsumerInfo consumerInfo) throws JMSException {
        Subscription subscription;
        if (consumerInfo.getDestination() == null || !consumerInfo.getDestination().isQueue() || (subscription = (Subscription) this.subscriptions.remove(consumerInfo.getConsumerId())) == null) {
            return;
        }
        subscription.setActive(false);
        subscription.clear();
        this.dispatcher.removeActiveSubscription(activeMQBrokerClient, subscription);
        for (MessageContainer messageContainer : this.messageContainers.values()) {
            if (messageContainer.getDestinationName().equals(subscription.getDestination().toString())) {
                VMLinkedList subscriptionList = getSubscriptionList(messageContainer);
                subscriptionList.remove(subscription);
                if (subscriptionList.isEmpty()) {
                    this.activeSubscriptions.remove(subscription.getDestination().toString());
                }
            }
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void deleteSubscription(String str, String str2) throws JMSException {
    }

    @Override // org.codehaus.activemq.service.Service
    public void start() throws JMSException {
        this.dispatcher.start();
    }

    @Override // org.codehaus.activemq.service.Service
    public void stop() throws JMSException {
        this.dispatcher.stop();
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void sendMessage(ActiveMQBrokerClient activeMQBrokerClient, ActiveMQMessage activeMQMessage) throws JMSException {
        if (((ActiveMQDestination) activeMQMessage.getJMSDestination()).isQueue()) {
            getContainer(activeMQMessage.getJMSDestination().toString()).addMessage(activeMQMessage);
        }
    }

    protected MessageContainer getContainer(String str) throws JMSException {
        MessageContainer messageContainer = (MessageContainer) this.messageContainers.get(str);
        if (messageContainer == null) {
            messageContainer = new VMQueueMessageContainer(str);
            this.messageContainers.put(str, messageContainer);
            Iterator it = this.subscriptions.values().iterator();
            while (it.hasNext()) {
                updateActiveSubscriptions(messageContainer, (Subscription) it.next());
            }
        }
        return messageContainer;
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void acknowledgeMessage(ActiveMQBrokerClient activeMQBrokerClient, MessageAck messageAck) throws JMSException {
        Subscription subscription = (Subscription) this.subscriptions.get(messageAck.getConsumerId());
        if (subscription != null) {
            subscription.removeMessage(messageAck.getMessageID());
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void poll() throws JMSException {
        for (MessageContainer messageContainer : this.activeSubscriptions.keySet()) {
            doPoll(messageContainer, (VMLinkedList) this.activeSubscriptions.get(messageContainer));
        }
    }

    private void doPoll(MessageContainer messageContainer, VMLinkedList vMLinkedList) throws JMSException {
        int i = 0;
        do {
            boolean z = false;
            ActiveMQMessage poll = messageContainer.poll();
            if (poll != null) {
                VMLinkedListEntry firstEntry = vMLinkedList.getFirstEntry();
                while (true) {
                    VMLinkedListEntry vMLinkedListEntry = firstEntry;
                    if (vMLinkedListEntry == null) {
                        break;
                    }
                    VMSubscription vMSubscription = (VMSubscription) vMLinkedListEntry.getElement();
                    if (vMSubscription.isTarget(poll) && !vMSubscription.isAtPrefetchLimit()) {
                        vMSubscription.addMessage(messageContainer, poll);
                        vMLinkedList.rotate();
                        z = true;
                        i++;
                        break;
                    }
                    firstEntry = vMLinkedList.getNextEntry(vMLinkedListEntry);
                }
            }
            if (!z || poll == null) {
                return;
            }
        } while (i < MAX_MESSAGES_DISPATCHED_FROM_POLL);
    }
}
