package org.activemq.broker.region;

import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import org.activemq.broker.ConnectionContext;
import org.activemq.broker.region.policy.DispatchPolicy;
import org.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.activemq.command.ActiveMQDestination;
import org.activemq.command.ConsumerId;
import org.activemq.command.Message;
import org.activemq.command.MessageAck;
import org.activemq.command.MessageId;
import org.activemq.filter.MessageEvaluationContext;
import org.activemq.memory.UsageManager;
import org.activemq.store.MessageRecoveryListener;
import org.activemq.store.MessageStore;
import org.activemq.thread.TaskRunnerFactory;
import org.activemq.thread.Valve;
import org.activemq.transaction.Synchronization;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/activemq/broker/region/Queue.class */
public class Queue implements Destination {
    private final Log log;
    protected final ActiveMQDestination destination;
    protected final UsageManager usageManager;
    private Subscription exclusiveOwner;
    protected final MessageStore store;
    protected int highestSubscriptionPriority;
    protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
    protected final LinkedList messages = new LinkedList();
    protected final Valve dispatchValve = new Valve(true);
    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
    private final ConcurrentHashMap messageGroupOwners = new ConcurrentHashMap();
    protected long garbageSize = 0;
    protected long garbageSizeBeforeCollection = 1000;
    private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();

    public Queue(ActiveMQDestination activeMQDestination, UsageManager usageManager, MessageStore messageStore, DestinationStatistics destinationStatistics, TaskRunnerFactory taskRunnerFactory) throws Throwable {
        this.destination = activeMQDestination;
        this.usageManager = usageManager;
        this.store = messageStore;
        this.destinationStatistics.setParent(destinationStatistics);
        this.log = LogFactory.getLog(new StringBuffer().append(getClass().getName()).append(ActiveMQDestination.PATH_SEPERATOR).append(activeMQDestination.getPhysicalName()).toString());
        if (messageStore != null) {
            messageStore.recover(new MessageRecoveryListener(this) { // from class: org.activemq.broker.region.Queue.1
                private final Queue this$0;

                {
                    this.this$0 = this;
                }

                @Override // org.activemq.store.MessageRecoveryListener
                public void recoverMessage(Message message) {
                    message.setRegionDestination(this.this$0);
                    MessageReference createMessageReference = this.this$0.createMessageReference(message);
                    this.this$0.messages.add(createMessageReference);
                    createMessageReference.decrementReferenceCount();
                    this.this$0.destinationStatistics.getMessages().increment();
                }
            });
        }
    }

    @Override // org.activemq.broker.region.Destination
    public synchronized boolean lock(MessageReference messageReference, Subscription subscription) {
        if (this.exclusiveOwner == subscription) {
            return true;
        }
        if (this.exclusiveOwner != null || subscription.getConsumerInfo().getPriority() != this.highestSubscriptionPriority) {
            return false;
        }
        if (!subscription.getConsumerInfo().isExclusive()) {
            return true;
        }
        this.exclusiveOwner = subscription;
        return true;
    }

    @Override // org.activemq.broker.region.Destination
    public void addSubscription(ConnectionContext connectionContext, Subscription subscription) throws Throwable {
        subscription.add(connectionContext, this);
        this.destinationStatistics.getConsumers().increment();
        this.dispatchValve.turnOff();
        MessageEvaluationContext messageEvaluationContext = connectionContext.getMessageEvaluationContext();
        try {
            this.consumers.add(subscription);
            this.highestSubscriptionPriority = calcHighestSubscriptionPriority();
            messageEvaluationContext.setDestination(this.destination);
            synchronized (this.messages) {
                Iterator it = this.messages.iterator();
                while (it.hasNext()) {
                    IndirectMessageReference indirectMessageReference = (IndirectMessageReference) it.next();
                    if (!indirectMessageReference.isDropped()) {
                        try {
                            messageEvaluationContext.setMessageReference(indirectMessageReference);
                            if (subscription.matches(indirectMessageReference, messageEvaluationContext)) {
                                subscription.add(indirectMessageReference);
                            }
                        } catch (IOException e) {
                            this.log.warn(new StringBuffer().append("Could not load message: ").append(e).toString(), e);
                        }
                    }
                }
            }
        } finally {
            messageEvaluationContext.clear();
            this.dispatchValve.turnOn();
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.activemq.broker.region.Destination
    public void removeSubscription(ConnectionContext connectionContext, Subscription subscription) throws Throwable {
        this.destinationStatistics.getConsumers().decrement();
        this.dispatchValve.turnOff();
        try {
            this.consumers.remove(subscription);
            subscription.remove(connectionContext, this);
            this.highestSubscriptionPriority = calcHighestSubscriptionPriority();
            boolean z = false;
            if (this.exclusiveOwner == subscription) {
                this.exclusiveOwner = null;
                z = true;
            }
            HashSet hashSet = new HashSet();
            ConsumerId consumerId = subscription.getConsumerInfo().getConsumerId();
            Iterator it = this.messageGroupOwners.keySet().iterator();
            while (it.hasNext()) {
                String str = (String) it.next();
                if (((ConsumerId) this.messageGroupOwners.get(str)).equals(consumerId)) {
                    hashSet.add(str);
                    it.remove();
                }
            }
            synchronized (this.messages) {
                if (!subscription.getConsumerInfo().isBrowser()) {
                    MessageEvaluationContext messageEvaluationContext = connectionContext.getMessageEvaluationContext();
                    try {
                        messageEvaluationContext.setDestination(this.destination);
                        Iterator it2 = this.messages.iterator();
                        while (it2.hasNext()) {
                            IndirectMessageReference indirectMessageReference = (IndirectMessageReference) it2.next();
                            if (!indirectMessageReference.isDropped()) {
                                String groupID = indirectMessageReference.getGroupID();
                                if (indirectMessageReference.getLockOwner() == subscription || z || (groupID != null && hashSet.contains(groupID))) {
                                    indirectMessageReference.incrementRedeliveryCounter();
                                    indirectMessageReference.unlock();
                                    messageEvaluationContext.setMessageReference(indirectMessageReference);
                                    this.dispatchPolicy.dispatch(connectionContext, indirectMessageReference, messageEvaluationContext, this.consumers);
                                }
                            }
                        }
                        messageEvaluationContext.clear();
                    } catch (Throwable th) {
                        messageEvaluationContext.clear();
                        throw th;
                    }
                }
            }
        } finally {
            this.dispatchValve.turnOn();
        }
    }

    @Override // org.activemq.broker.region.Destination
    public void send(ConnectionContext connectionContext, Message message) throws Throwable {
        if (connectionContext.isProducerFlowControl()) {
            this.usageManager.waitForSpace();
        }
        message.setRegionDestination(this);
        if (this.store != null && message.isPersistent()) {
            this.store.addMessage(connectionContext, message);
        }
        MessageReference createMessageReference = createMessageReference(message);
        try {
            if (connectionContext.isInTransaction()) {
                connectionContext.getTransaction().addSynchronization(new Synchronization(this, connectionContext, createMessageReference, message) { // from class: org.activemq.broker.region.Queue.2
                    private final ConnectionContext val$context;
                    private final MessageReference val$node;
                    private final Message val$message;
                    private final Queue this$0;

                    {
                        this.this$0 = this;
                        this.val$context = connectionContext;
                        this.val$node = createMessageReference;
                        this.val$message = message;
                    }

                    @Override // org.activemq.transaction.Synchronization
                    public void afterCommit() throws Throwable {
                        this.this$0.dispatch(this.val$context, this.val$node, this.val$message);
                    }
                });
            } else {
                dispatch(connectionContext, createMessageReference, message);
            }
        } finally {
            createMessageReference.decrementReferenceCount();
        }
    }

    @Override // org.activemq.broker.region.Destination
    public void dispose(ConnectionContext connectionContext) throws IOException {
        if (this.store != null) {
            this.store.removeAllMessages(connectionContext);
        }
    }

    public void dropEvent() {
        this.destinationStatistics.getMessages().decrement();
        synchronized (this.messages) {
            this.garbageSize++;
            if (this.garbageSize > this.garbageSizeBeforeCollection) {
                gc();
            }
        }
    }

    @Override // org.activemq.broker.region.Destination
    public void gc() {
        synchronized (this.messages) {
            Iterator it = this.messages.iterator();
            while (it.hasNext()) {
                if (((IndirectMessageReference) it.next()).isDropped()) {
                    this.garbageSize--;
                    it.remove();
                }
            }
        }
    }

    @Override // org.activemq.broker.region.Destination
    public void acknowledge(ConnectionContext connectionContext, Subscription subscription, MessageAck messageAck, MessageReference messageReference) throws IOException {
        if (this.store == null || !messageReference.isPersistent()) {
            return;
        }
        this.store.removeMessage(connectionContext, messageAck);
    }

    @Override // org.activemq.broker.region.Destination
    public Message loadMessage(MessageId messageId) throws IOException {
        Message message = this.store.getMessage(messageId);
        if (message != null) {
            message.setRegionDestination(this);
        }
        return message;
    }

    public String toString() {
        return new StringBuffer().append("Queue: destination=").append(this.destination.getPhysicalName()).append(", subscriptions=").append(this.consumers.size()).append(", memory=").append(this.usageManager.getPercentUsage()).append("%, size=").append(this.messages.size()).append(", in flight groups=").append(this.messageGroupOwners.size()).toString();
    }

    @Override // org.activemq.Service
    public void start() throws Exception {
    }

    @Override // org.activemq.Service
    public void stop() throws Exception {
    }

    @Override // org.activemq.broker.region.Destination
    public ActiveMQDestination getActiveMQDestination() {
        return this.destination;
    }

    @Override // org.activemq.broker.region.Destination
    public UsageManager getUsageManager() {
        return this.usageManager;
    }

    @Override // org.activemq.broker.region.Destination
    public DestinationStatistics getDestinationStatistics() {
        return this.destinationStatistics;
    }

    public ConcurrentHashMap getMessageGroupOwners() {
        return this.messageGroupOwners;
    }

    public DispatchPolicy getDispatchPolicy() {
        return this.dispatchPolicy;
    }

    public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
        this.dispatchPolicy = dispatchPolicy;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MessageReference createMessageReference(Message message) {
        return new IndirectMessageReference(this, message);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void dispatch(ConnectionContext connectionContext, MessageReference messageReference, Message message) throws Throwable {
        this.dispatchValve.increment();
        MessageEvaluationContext messageEvaluationContext = connectionContext.getMessageEvaluationContext();
        try {
            this.destinationStatistics.getEnqueues().increment();
            this.destinationStatistics.getMessages().increment();
            synchronized (this.messages) {
                this.messages.add(messageReference);
            }
            if (this.consumers.isEmpty()) {
                return;
            }
            messageEvaluationContext.setDestination(this.destination);
            messageEvaluationContext.setMessageReference(messageReference);
            this.dispatchPolicy.dispatch(connectionContext, messageReference, messageEvaluationContext, this.consumers);
            messageEvaluationContext.clear();
            this.dispatchValve.decrement();
        } finally {
            messageEvaluationContext.clear();
            this.dispatchValve.decrement();
        }
    }

    private int calcHighestSubscriptionPriority() {
        byte b = -2147483648;
        Iterator it = this.consumers.iterator();
        while (it.hasNext()) {
            Subscription subscription = (Subscription) it.next();
            if (subscription.getConsumerInfo().getPriority() > b) {
                b = subscription.getConsumerInfo().getPriority();
            }
        }
        return b;
    }

    @Override // org.activemq.broker.region.Destination
    public MessageStore getMessageStore() {
        return this.store;
    }
}
