package org.activemq.broker.region;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.activemq.broker.ConnectionContext;
import org.activemq.command.ActiveMQDestination;
import org.activemq.command.ActiveMQQueue;
import org.activemq.command.ConsumerInfo;
import org.activemq.command.Message;
import org.activemq.command.MessageAck;
import org.activemq.command.MessageDispatch;
import org.activemq.memory.UsageManager;
import org.activemq.transaction.Synchronization;

/* loaded from: input_file:org/activemq/broker/region/TopicSubscription.class */
public class TopicSubscription extends AbstractSubscription {
    protected final LinkedList matched;
    protected final ActiveMQDestination dlqDestination;
    protected final UsageManager usageManager;
    protected int dispatched;
    protected int delivered;

    public TopicSubscription(ConnectionContext connectionContext, ConsumerInfo consumerInfo, UsageManager usageManager) throws InvalidSelectorException {
        super(connectionContext, consumerInfo);
        this.matched = new LinkedList();
        this.dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
        this.dispatched = 0;
        this.delivered = 0;
        this.usageManager = usageManager;
    }

    @Override // org.activemq.broker.region.Subscription
    public void add(MessageReference messageReference) throws InterruptedException, IOException {
        messageReference.incrementReferenceCount();
        if (isFull()) {
            this.matched.addLast(messageReference);
        } else {
            dispatch(messageReference);
        }
    }

    @Override // org.activemq.broker.region.Subscription
    public void acknowledge(ConnectionContext connectionContext, MessageAck messageAck) throws Throwable {
        boolean isFull = isFull();
        if (!messageAck.isStandardAck() && !messageAck.isPoisonAck()) {
            if (!messageAck.isDeliveredAck()) {
                throw new JMSException(new StringBuffer().append("Invalid acknowledgment: ").append(messageAck).toString());
            }
            this.delivered += messageAck.getMessageCount();
            if (!isFull || isFull()) {
                return;
            }
            dispatchMatched();
            return;
        }
        if (connectionContext.isInTransaction()) {
            this.delivered += messageAck.getMessageCount();
            connectionContext.getTransaction().addSynchronization(new Synchronization(this, messageAck) { // from class: org.activemq.broker.region.TopicSubscription.1
                private final MessageAck val$ack;
                private final TopicSubscription this$0;

                {
                    this.this$0 = this;
                    this.val$ack = messageAck;
                }

                @Override // org.activemq.transaction.Synchronization
                public void afterCommit() throws Throwable {
                    synchronized (this.this$0) {
                        this.this$0.dispatched -= this.val$ack.getMessageCount();
                        this.this$0.delivered = Math.max(0, this.this$0.delivered - this.val$ack.getMessageCount());
                    }
                }
            });
        } else {
            this.dispatched -= messageAck.getMessageCount();
            this.delivered = Math.max(0, this.delivered - messageAck.getMessageCount());
        }
        if (!isFull || isFull()) {
            return;
        }
        dispatchMatched();
    }

    private boolean isFull() {
        return this.dispatched - this.delivered >= this.info.getPrefetchSize();
    }

    private void dispatchMatched() throws IOException {
        Iterator it = this.matched.iterator();
        while (it.hasNext() && !isFull()) {
            MessageReference messageReference = (MessageReference) it.next();
            it.remove();
            dispatch(messageReference);
        }
    }

    private void dispatch(MessageReference messageReference) throws IOException {
        MessageDispatch messageDispatch = new MessageDispatch();
        messageDispatch.setMessage((Message) messageReference);
        messageDispatch.setConsumerId(this.info.getConsumerId());
        messageDispatch.setDestination(messageReference.getRegionDestination().getActiveMQDestination());
        this.dispatched++;
        if (this.info.isDispatchAsync()) {
            messageDispatch.setConsumer(new Runnable(this, messageReference) { // from class: org.activemq.broker.region.TopicSubscription.2
                private final MessageReference val$node;
                private final TopicSubscription this$0;

                {
                    this.this$0 = this;
                    this.val$node = messageReference;
                }

                @Override // java.lang.Runnable
                public void run() {
                    this.val$node.decrementReferenceCount();
                }
            });
            this.context.getConnection().dispatchAsync(messageDispatch);
        } else {
            this.context.getConnection().dispatchSync(messageDispatch);
            messageReference.decrementReferenceCount();
        }
    }

    public String toString() {
        return new StringBuffer().append("TopicSubscription: consumer=").append(this.info.getConsumerId()).append(", destinations=").append(this.destinations.size()).append(", dispatched=").append(this.dispatched).append(", delivered=").append(this.delivered).append(", matched=").append(this.matched.size()).toString();
    }
}
