package org.codehaus.activemq.service.boundedvm;

import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.util.List;
import java.util.ListIterator;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.util.MemoryBoundedQueue;
import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.QueueListEntry;
import org.codehaus.activemq.service.Service;
import org.codehaus.activemq.service.impl.DefaultQueueList;

/* loaded from: input_file:org/codehaus/activemq/service/boundedvm/TransientQueueBoundedMessageContainer.class */
public class TransientQueueBoundedMessageContainer implements MessageContainer, Service, Runnable {
    private MemoryBoundedQueueManager queueManager;
    private ActiveMQDestination destination;
    private MemoryBoundedQueue queue;
    private Executor threadPool;
    private Log log;
    private long idleTimestamp;
    private SynchronizedBoolean started = new SynchronizedBoolean(false);
    private SynchronizedBoolean running = new SynchronizedBoolean(false);
    private DefaultQueueList subscriptions = new DefaultQueueList();

    public TransientQueueBoundedMessageContainer(Executor executor, MemoryBoundedQueueManager memoryBoundedQueueManager, ActiveMQDestination activeMQDestination) {
        this.threadPool = executor;
        this.queueManager = memoryBoundedQueueManager;
        this.destination = activeMQDestination;
        this.queue = memoryBoundedQueueManager.getMemoryBoundedQueue(new StringBuffer().append("TRANSIENT_QUEUE:-").append(activeMQDestination.getPhysicalName()).toString());
        this.log = LogFactory.getLog(new StringBuffer().append("TransientQueueBoundedMessageContainer:- ").append(activeMQDestination).toString());
    }

    public boolean isActive() {
        return !this.subscriptions.isEmpty();
    }

    public boolean isEmpty() {
        return this.queue.isEmpty();
    }

    public long getIdleTimestamp() {
        return this.idleTimestamp;
    }

    public TransientQueueSubscription addConsumer(Filter filter, ConsumerInfo consumerInfo, BrokerClient brokerClient) throws JMSException {
        TransientQueueSubscription findMatch = findMatch(consumerInfo);
        if (findMatch == null) {
            findMatch = new TransientQueueSubscription(brokerClient, this.queueManager.getMemoryBoundedQueue(new StringBuffer().append(brokerClient.toString()).append(consumerInfo.getConsumerId()).toString()), filter, consumerInfo);
            synchronized (this.subscriptions) {
                this.idleTimestamp = 0L;
                this.subscriptions.add(findMatch);
                if (this.started.get()) {
                    synchronized (this.running) {
                        if (this.running.commit(false, true)) {
                            try {
                                this.threadPool.execute(this);
                            } catch (InterruptedException e) {
                                JMSException jMSException = new JMSException(new StringBuffer().append(toString()).append(" Failed to start running dispatch thread").toString());
                                jMSException.setLinkedException(e);
                                throw jMSException;
                            }
                        }
                    }
                }
            }
        }
        return findMatch;
    }

    public void removeConsumer(ConsumerInfo consumerInfo) throws JMSException {
        TransientQueueSubscription findMatch = findMatch(consumerInfo);
        if (findMatch != null) {
            synchronized (this.subscriptions) {
                this.subscriptions.remove(findMatch);
                if (this.subscriptions.isEmpty()) {
                    this.running.commit(true, false);
                    this.idleTimestamp = System.currentTimeMillis();
                }
            }
            List undeliveredMessages = findMatch.getUndeliveredMessages();
            ListIterator listIterator = undeliveredMessages.listIterator(undeliveredMessages.size());
            while (listIterator.hasPrevious()) {
                ActiveMQMessage activeMQMessage = (ActiveMQMessage) listIterator.previous();
                activeMQMessage.setJMSRedelivered(true);
                this.queue.enqueueFirstNoBlock(activeMQMessage);
            }
            undeliveredMessages.clear();
            findMatch.close();
        }
    }

    @Override // org.codehaus.activemq.service.Service
    public void start() throws JMSException {
        if (!this.started.commit(false, true) || this.subscriptions.isEmpty()) {
            return;
        }
        synchronized (this.running) {
            if (this.running.commit(false, true)) {
                try {
                    this.threadPool.execute(this);
                } catch (InterruptedException e) {
                    JMSException jMSException = new JMSException(new StringBuffer().append(toString()).append(" Failed to start").toString());
                    jMSException.setLinkedException(e);
                    throw jMSException;
                }
            }
        }
    }

    public void enqueue(ActiveMQMessage activeMQMessage) {
        this.queue.enqueue(activeMQMessage);
    }

    public void enqueueFirst(ActiveMQMessage activeMQMessage) {
        this.queue.enqueueFirstNoBlock(activeMQMessage);
    }

    @Override // org.codehaus.activemq.service.Service
    public void stop() {
        this.started.set(false);
        this.running.set(false);
        this.queue.clear();
    }

    public void close() throws JMSException {
        if (this.started.get()) {
            stop();
        }
        this.queue.close();
        QueueListEntry firstEntry = this.subscriptions.getFirstEntry();
        while (true) {
            QueueListEntry queueListEntry = firstEntry;
            if (queueListEntry == null) {
                this.subscriptions.clear();
                return;
            } else {
                ((TransientQueueSubscription) queueListEntry.getElement()).close();
                firstEntry = this.subscriptions.getNextEntry(queueListEntry);
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        ActiveMQMessage activeMQMessage = null;
        while (this.started.get() && this.running.get()) {
            try {
                boolean z = false;
                boolean z2 = false;
                if (!this.subscriptions.isEmpty()) {
                    activeMQMessage = (ActiveMQMessage) this.queue.dequeue(2000L);
                    if (activeMQMessage != null) {
                        if (!activeMQMessage.isExpired()) {
                            QueueListEntry firstEntry = this.subscriptions.getFirstEntry();
                            while (true) {
                                if (firstEntry == null) {
                                    break;
                                }
                                TransientQueueSubscription transientQueueSubscription = (TransientQueueSubscription) firstEntry.getElement();
                                if (transientQueueSubscription.isTarget(activeMQMessage)) {
                                    z2 = true;
                                    if (transientQueueSubscription.canAcceptMessages()) {
                                        transientQueueSubscription.doDispatch(activeMQMessage);
                                        activeMQMessage = null;
                                        z = true;
                                        this.subscriptions.rotate();
                                        break;
                                    }
                                }
                                firstEntry = this.subscriptions.getNextEntry(firstEntry);
                            }
                        } else {
                            if (this.log.isDebugEnabled()) {
                                this.log.debug(new StringBuffer().append("expired message: ").append(activeMQMessage).toString());
                            }
                            activeMQMessage = null;
                        }
                    }
                }
                if (!z) {
                    if (activeMQMessage != null) {
                        if (z2) {
                            this.queue.enqueueFirstNoBlock(activeMQMessage);
                        } else {
                            this.queue.enqueueNoBlock(activeMQMessage);
                        }
                    }
                    if (this.running.get()) {
                        Thread.sleep(250L);
                    }
                }
            } catch (Exception e) {
                stop();
                this.log.warn("stop dispatching", e);
                return;
            }
        }
    }

    private TransientQueueSubscription findMatch(ConsumerInfo consumerInfo) throws JMSException {
        TransientQueueSubscription transientQueueSubscription = null;
        QueueListEntry firstEntry = this.subscriptions.getFirstEntry();
        while (true) {
            QueueListEntry queueListEntry = firstEntry;
            if (queueListEntry == null) {
                break;
            }
            TransientQueueSubscription transientQueueSubscription2 = (TransientQueueSubscription) queueListEntry.getElement();
            if (transientQueueSubscription2.getConsumerInfo().equals(consumerInfo)) {
                transientQueueSubscription = transientQueueSubscription2;
                break;
            }
            firstEntry = this.subscriptions.getNextEntry(queueListEntry);
        }
        return transientQueueSubscription;
    }

    public ActiveMQDestination getDestination() {
        return this.destination;
    }

    @Override // org.codehaus.activemq.service.MessageContainer
    public String getDestinationName() {
        return this.destination.getPhysicalName();
    }

    @Override // org.codehaus.activemq.service.MessageContainer
    public MessageIdentity addMessage(ActiveMQMessage activeMQMessage) throws JMSException {
        return null;
    }

    @Override // org.codehaus.activemq.service.MessageContainer
    public void delete(MessageIdentity messageIdentity, MessageAck messageAck) throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainer
    public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException {
        return null;
    }

    @Override // org.codehaus.activemq.service.MessageContainer
    public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainer
    public void unregisterMessageInterest(MessageIdentity messageIdentity, MessageAck messageAck) throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainer
    public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
        return false;
    }

    protected void clear() {
        this.queue.clear();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void removeExpiredMessages() {
        long currentTimeMillis = System.currentTimeMillis();
        List contents = this.queue.getContents();
        for (int i = 0; i < contents.size(); i++) {
            ActiveMQMessage activeMQMessage = (ActiveMQMessage) contents.get(i);
            if (activeMQMessage.isExpired(currentTimeMillis)) {
                this.queue.remove(activeMQMessage);
                if (this.log.isDebugEnabled()) {
                    this.log.debug(new StringBuffer().append("expired message: ").append(activeMQMessage).toString());
                }
            }
        }
    }
}
