package org.codehaus.activemq.service.impl;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.jms.Destination;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import org.codehaus.activemq.DuplicateDurableSubscriptionException;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.filter.AndFilter;
import org.codehaus.activemq.filter.DestinationMap;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.filter.FilterFactory;
import org.codehaus.activemq.filter.FilterFactoryImpl;
import org.codehaus.activemq.filter.NoLocalFilter;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ActiveMQTopic;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.service.Dispatcher;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.RedeliveryPolicy;
import org.codehaus.activemq.service.Subscription;
import org.codehaus.activemq.service.SubscriptionContainer;
import org.codehaus.activemq.service.TopicMessageContainer;
import org.codehaus.activemq.store.PersistenceAdapter;

/* loaded from: input_file:org/codehaus/activemq/service/impl/DurableTopicMessageContainerManager.class */
public class DurableTopicMessageContainerManager extends MessageContainerManagerSupport {
    private PersistenceAdapter persistenceAdapter;
    protected SubscriptionContainer subscriptionContainer;
    protected FilterFactory filterFactory;
    protected Map activeSubscriptions;
    private DestinationMap destinationMap;
    private boolean loadedMessageContainers;

    public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, RedeliveryPolicy redeliveryPolicy) {
        this(persistenceAdapter, new DurableTopicSubscriptionContainerImpl(redeliveryPolicy), new FilterFactoryImpl(), new DispatcherImpl());
    }

    public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
        super(dispatcher);
        this.activeSubscriptions = new ConcurrentHashMap();
        this.destinationMap = new DestinationMap();
        this.persistenceAdapter = persistenceAdapter;
        this.subscriptionContainer = subscriptionContainer;
        this.filterFactory = filterFactory;
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void addMessageConsumer(BrokerClient brokerClient, ConsumerInfo consumerInfo) throws JMSException {
        if (consumerInfo.isDurableTopic()) {
            doAddMessageConsumer(brokerClient, consumerInfo);
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void removeMessageConsumer(BrokerClient brokerClient, ConsumerInfo consumerInfo) throws JMSException {
        Subscription subscription = (Subscription) this.activeSubscriptions.remove(consumerInfo.getConsumerId());
        if (subscription != null) {
            subscription.setActive(false);
            this.dispatcher.removeActiveSubscription(brokerClient, subscription);
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void deleteSubscription(String str, String str2) throws JMSException {
        boolean z = false;
        Iterator subscriptionIterator = this.subscriptionContainer.subscriptionIterator();
        while (subscriptionIterator.hasNext()) {
            Subscription subscription = (Subscription) subscriptionIterator.next();
            if (subscription.getClientId().equals(str) && subscription.getSubscriberName().equals(str2)) {
                if (subscription.isActive()) {
                    throw new JMSException(new StringBuffer().append("The Consummer ").append(str2).append(" is still active").toString());
                }
                this.subscriptionContainer.removeSubscription(subscription.getConsumerId());
                subscription.clear();
                z = true;
            }
        }
        if (!z) {
            throw new InvalidDestinationException(new StringBuffer().append("The Consumer ").append(str2).append(" does not exist for client: ").append(str).toString());
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void sendMessage(BrokerClient brokerClient, ActiveMQMessage activeMQMessage) throws JMSException {
        ActiveMQDestination activeMQDestination = (ActiveMQDestination) activeMQMessage.getJMSDestination();
        if (activeMQDestination != null && activeMQDestination.isTopic() && activeMQMessage.getJMSDeliveryMode() == 2) {
            MessageContainer container = getContainer(activeMQMessage.getJMSDestination().toString());
            Set<Subscription> subscriptions = this.subscriptionContainer.getSubscriptions(activeMQMessage.getJMSActiveMQDestination());
            container.addMessage(activeMQMessage);
            if (subscriptions.isEmpty()) {
                return;
            }
            for (Subscription subscription : subscriptions) {
                if (subscription.isTarget(activeMQMessage)) {
                    subscription.addMessage(container, activeMQMessage);
                }
            }
            updateSendStats(brokerClient, activeMQMessage);
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void acknowledgeMessage(BrokerClient brokerClient, MessageAck messageAck) throws JMSException {
        Subscription subscription;
        if (!messageAck.getDestination().isTopic() || (subscription = (Subscription) this.activeSubscriptions.get(messageAck.getConsumerId())) == null) {
            return;
        }
        subscription.messageConsumed(messageAck);
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void acknowledgeTransactedMessage(BrokerClient brokerClient, String str, MessageAck messageAck) throws JMSException {
        Subscription subscription = (Subscription) this.activeSubscriptions.get(messageAck.getConsumerId());
        if (subscription != null) {
            subscription.onAcknowledgeTransactedMessageBeforeCommit(messageAck);
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void redeliverMessage(BrokerClient brokerClient, MessageAck messageAck) throws JMSException {
        Subscription subscription = (Subscription) this.activeSubscriptions.get(messageAck.getConsumerId());
        if (subscription != null) {
            for (MessageContainer messageContainer : this.messageContainers.values()) {
                if (messageContainer.containsMessage(messageAck.getMessageIdentity())) {
                    subscription.redeliverMessage(messageContainer, messageAck);
                    return;
                }
            }
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void poll() throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void commitTransaction(BrokerClient brokerClient, String str) {
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void rollbackTransaction(BrokerClient brokerClient, String str) {
    }

    @Override // org.codehaus.activemq.service.impl.MessageContainerManagerSupport
    protected MessageContainer createContainer(String str) throws JMSException {
        TopicMessageContainer createTopicMessageContainer = this.persistenceAdapter.createTopicMessageContainer(str);
        this.destinationMap.put(new ActiveMQTopic(str), createTopicMessageContainer);
        return createTopicMessageContainer;
    }

    @Override // org.codehaus.activemq.service.impl.MessageContainerManagerSupport
    protected Destination createDestination(String str) {
        return new ActiveMQTopic(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doAddMessageConsumer(BrokerClient brokerClient, ConsumerInfo consumerInfo) throws JMSException {
        boolean z = false;
        if (consumerInfo.getConsumerName() != null && consumerInfo.getClientId() != null) {
            Iterator it = this.activeSubscriptions.values().iterator();
            while (it.hasNext()) {
                if (((Subscription) it.next()).isSameDurableSubscription(consumerInfo)) {
                    throw new DuplicateDurableSubscriptionException(consumerInfo);
                }
            }
        }
        Subscription subscription = this.subscriptionContainer.getSubscription(consumerInfo.getConsumerId());
        if (subscription == null || !subscription.isDurableTopic()) {
            subscription = this.subscriptionContainer.makeSubscription(this.dispatcher, brokerClient, consumerInfo, createFilter(consumerInfo));
            z = true;
        } else if (!subscription.getDestination().equals(subscription.getDestination()) || !subscription.getSelector().equals(consumerInfo.getSelector())) {
            this.subscriptionContainer.removeSubscription(consumerInfo.getConsumerId());
            subscription.clear();
            subscription = this.subscriptionContainer.makeSubscription(this.dispatcher, brokerClient, consumerInfo, createFilter(consumerInfo));
        }
        subscription.setActiveConsumer(brokerClient, consumerInfo);
        this.activeSubscriptions.put(consumerInfo.getConsumerId(), subscription);
        this.dispatcher.addActiveSubscription(brokerClient, subscription);
        if (subscription.isWildcard()) {
            synchronized (this) {
                if (!this.loadedMessageContainers) {
                    loadAllMessageContainers();
                    this.loadedMessageContainers = true;
                }
            }
        } else {
            getContainer(subscription.getDestination().getPhysicalName());
        }
        for (TopicMessageContainer topicMessageContainer : this.destinationMap.get(subscription.getDestination())) {
            if (topicMessageContainer instanceof DurableTopicMessageContainer) {
                ((DurableTopicMessageContainer) topicMessageContainer).storeSubscription(consumerInfo, subscription);
            }
        }
        if (z) {
            recoverSubscriptions(subscription);
        }
        subscription.setActive(true);
    }

    protected void recoverSubscriptions(Subscription subscription) throws JMSException {
        if (subscription.isWildcard()) {
            synchronized (this) {
                if (!this.loadedMessageContainers) {
                    loadAllMessageContainers();
                    this.loadedMessageContainers = true;
                }
            }
        } else {
            getContainer(subscription.getDestination().getPhysicalName());
        }
        Iterator it = this.destinationMap.get(subscription.getDestination()).iterator();
        while (it.hasNext()) {
            ((TopicMessageContainer) it.next()).recoverSubscription(subscription);
        }
    }

    protected void loadAllMessageContainers() throws JMSException {
        Map initialDestinations = this.persistenceAdapter.getInitialDestinations();
        if (initialDestinations != null) {
            for (Map.Entry entry : initialDestinations.entrySet()) {
                loadContainer((String) entry.getKey(), (Destination) entry.getValue());
            }
        }
    }

    protected Filter createFilter(ConsumerInfo consumerInfo) throws JMSException {
        Filter createFilter = this.filterFactory.createFilter(consumerInfo.getDestination(), consumerInfo.getSelector());
        if (consumerInfo.isNoLocal()) {
            createFilter = new AndFilter(createFilter, new NoLocalFilter(consumerInfo.getClientId()));
        }
        return createFilter;
    }
}
