package org.activemq.broker.region;

import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import java.io.IOException;
import org.activemq.broker.ConnectionContext;
import org.activemq.broker.region.policy.DispatchPolicy;
import org.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
import org.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.activemq.command.ActiveMQDestination;
import org.activemq.command.Message;
import org.activemq.command.MessageAck;
import org.activemq.command.MessageId;
import org.activemq.command.SubscriptionInfo;
import org.activemq.filter.MessageEvaluationContext;
import org.activemq.memory.UsageManager;
import org.activemq.store.MessageRecoveryListener;
import org.activemq.store.MessageStore;
import org.activemq.store.TopicMessageStore;
import org.activemq.thread.TaskRunnerFactory;
import org.activemq.thread.Valve;
import org.activemq.transaction.Synchronization;
import org.activemq.util.SubscriptionKey;

/* loaded from: input_file:org/activemq/broker/region/Topic.class */
public class Topic implements Destination {
    protected final ActiveMQDestination destination;
    protected final TopicMessageStore store;
    protected final UsageManager usageManager;
    protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
    protected final Valve dispatchValve = new Valve(true);
    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
    private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();

    public Topic(ActiveMQDestination activeMQDestination, TopicMessageStore topicMessageStore, UsageManager usageManager, DestinationStatistics destinationStatistics, TaskRunnerFactory taskRunnerFactory) {
        this.destination = activeMQDestination;
        this.store = topicMessageStore;
        this.usageManager = usageManager;
        this.destinationStatistics.setParent(destinationStatistics);
    }

    @Override // org.activemq.broker.region.Destination
    public boolean lock(MessageReference messageReference, Subscription subscription) {
        return true;
    }

    @Override // org.activemq.broker.region.Destination
    public void addSubscription(ConnectionContext connectionContext, Subscription subscription) throws Throwable {
        this.destinationStatistics.getConsumers().increment();
        subscription.add(connectionContext, this);
        if (subscription.getConsumerInfo().isDurable()) {
            recover((DurableTopicSubscription) subscription, true);
            return;
        }
        if (subscription.getConsumerInfo().isRetroactive()) {
            this.subscriptionRecoveryPolicy.recover(connectionContext, subscription);
        }
        this.consumers.add(subscription);
    }

    public void recover(DurableTopicSubscription durableTopicSubscription, boolean z) throws Throwable {
        this.dispatchValve.turnOff();
        if (z) {
            try {
                this.consumers.add(durableTopicSubscription);
            } finally {
                this.dispatchValve.turnOn();
            }
        }
        if (this.store != null) {
            String clientId = durableTopicSubscription.getClientId();
            String subscriptionName = durableTopicSubscription.getSubscriptionName();
            String selector = durableTopicSubscription.getConsumerInfo().getSelector();
            SubscriptionInfo lookupSubscription = this.store.lookupSubscription(clientId, subscriptionName);
            if (lookupSubscription != null) {
                String selector2 = lookupSubscription.getSelector();
                if (((selector2 == null) ^ (selector == null)) || (selector2 != null && !selector2.equals(selector))) {
                    this.store.deleteSubscription(clientId, subscriptionName);
                    lookupSubscription = null;
                }
            }
            if (lookupSubscription == null) {
                this.store.addSubsciption(clientId, subscriptionName, selector, durableTopicSubscription.getConsumerInfo().isRetroactive());
            }
            if (durableTopicSubscription.isRecovered()) {
                MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
                messageEvaluationContext.setDestination(this.destination);
                this.store.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener(this, messageEvaluationContext, durableTopicSubscription) { // from class: org.activemq.broker.region.Topic.1
                    private final MessageEvaluationContext val$msgContext;
                    private final DurableTopicSubscription val$sub;
                    private final Topic this$0;

                    {
                        this.this$0 = this;
                        this.val$msgContext = messageEvaluationContext;
                        this.val$sub = durableTopicSubscription;
                    }

                    @Override // org.activemq.store.MessageRecoveryListener
                    public void recoverMessage(Message message) throws Throwable {
                        message.setRegionDestination(this.this$0);
                        try {
                            this.val$msgContext.setMessageReference(message);
                            if (this.val$sub.matches(message, this.val$msgContext)) {
                                this.val$sub.add(message);
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        } catch (InterruptedException e2) {
                            Thread.currentThread().interrupt();
                        }
                    }

                    @Override // org.activemq.store.MessageRecoveryListener
                    public void recoverMessageReference(String str) throws Throwable {
                        throw new RuntimeException("Should not be called.");
                    }
                });
            }
        }
    }

    @Override // org.activemq.broker.region.Destination
    public void removeSubscription(ConnectionContext connectionContext, Subscription subscription) throws Throwable {
        this.destinationStatistics.getConsumers().decrement();
        this.consumers.remove(subscription);
        subscription.remove(connectionContext, this);
    }

    @Override // org.activemq.broker.region.Destination
    public void send(ConnectionContext connectionContext, Message message) throws Throwable {
        if (connectionContext.isProducerFlowControl()) {
            this.usageManager.waitForSpace();
        }
        message.setRegionDestination(this);
        if (this.store != null && message.isPersistent()) {
            this.store.addMessage(connectionContext, message);
        }
        message.incrementReferenceCount();
        try {
            if (connectionContext.isInTransaction()) {
                connectionContext.getTransaction().addSynchronization(new Synchronization(this, connectionContext, message) { // from class: org.activemq.broker.region.Topic.2
                    private final ConnectionContext val$context;
                    private final Message val$message;
                    private final Topic this$0;

                    {
                        this.this$0 = this;
                        this.val$context = connectionContext;
                        this.val$message = message;
                    }

                    @Override // org.activemq.transaction.Synchronization
                    public void afterCommit() throws Throwable {
                        this.this$0.dispatch(this.val$context, this.val$message);
                    }
                });
            } else {
                dispatch(connectionContext, message);
            }
        } finally {
            message.decrementReferenceCount();
        }
    }

    public void deleteSubscription(ConnectionContext connectionContext, SubscriptionKey subscriptionKey) throws IOException {
        if (this.store != null) {
            this.store.deleteSubscription(subscriptionKey.clientId, subscriptionKey.subscriptionName);
        }
    }

    public String toString() {
        return new StringBuffer().append("Topic: destination=").append(this.destination.getPhysicalName()).append(", subscriptions=").append(this.consumers.size()).toString();
    }

    @Override // org.activemq.broker.region.Destination
    public void acknowledge(ConnectionContext connectionContext, Subscription subscription, MessageAck messageAck, MessageReference messageReference) throws IOException {
        if (this.store == null || !messageReference.isPersistent()) {
            return;
        }
        DurableTopicSubscription durableTopicSubscription = (DurableTopicSubscription) subscription;
        this.store.acknowledge(connectionContext, durableTopicSubscription.getClientId(), durableTopicSubscription.getSubscriptionName(), messageAck.getLastMessageId());
    }

    @Override // org.activemq.broker.region.Destination
    public void dispose(ConnectionContext connectionContext) throws IOException {
        if (this.store != null) {
            this.store.removeAllMessages(connectionContext);
        }
        this.destinationStatistics.setParent(null);
    }

    @Override // org.activemq.broker.region.Destination
    public void gc() {
    }

    @Override // org.activemq.broker.region.Destination
    public Message loadMessage(MessageId messageId) throws IOException {
        return this.store.getMessage(messageId);
    }

    @Override // org.activemq.Service
    public void start() throws Exception {
        this.subscriptionRecoveryPolicy.start();
    }

    @Override // org.activemq.Service
    public void stop() throws Exception {
        this.subscriptionRecoveryPolicy.stop();
    }

    @Override // org.activemq.broker.region.Destination
    public UsageManager getUsageManager() {
        return this.usageManager;
    }

    @Override // org.activemq.broker.region.Destination
    public DestinationStatistics getDestinationStatistics() {
        return this.destinationStatistics;
    }

    @Override // org.activemq.broker.region.Destination
    public ActiveMQDestination getActiveMQDestination() {
        return this.destination;
    }

    public DispatchPolicy getDispatchPolicy() {
        return this.dispatchPolicy;
    }

    public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
        this.dispatchPolicy = dispatchPolicy;
    }

    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
        return this.subscriptionRecoveryPolicy;
    }

    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
        this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
    }

    protected void dispatch(ConnectionContext connectionContext, Message message) throws Throwable {
        this.destinationStatistics.getEnqueues().increment();
        this.dispatchValve.increment();
        MessageEvaluationContext messageEvaluationContext = connectionContext.getMessageEvaluationContext();
        try {
            this.subscriptionRecoveryPolicy.add(connectionContext, message);
            if (this.consumers.isEmpty()) {
                return;
            }
            messageEvaluationContext.setDestination(this.destination);
            messageEvaluationContext.setMessageReference(message);
            this.dispatchPolicy.dispatch(connectionContext, message, messageEvaluationContext, this.consumers);
            messageEvaluationContext.clear();
            this.dispatchValve.decrement();
        } finally {
            messageEvaluationContext.clear();
            this.dispatchValve.decrement();
        }
    }

    @Override // org.activemq.broker.region.Destination
    public MessageStore getMessageStore() {
        return this.store;
    }
}
