package org.codehaus.activemq.broker.impl;

import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.util.Iterator;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.ActiveMQBroker;
import org.codehaus.activemq.broker.ActiveMQBrokerClient;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConnectionInfo;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.DurableUnsubscribe;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.PacketListener;
import org.codehaus.activemq.message.ProducerInfo;
import org.codehaus.activemq.message.Receipt;
import org.codehaus.activemq.message.SessionInfo;
import org.codehaus.activemq.message.TransactionInfo;
import org.codehaus.activemq.transport.IdGenerator;
import org.codehaus.activemq.transport.TransportChannel;

/* loaded from: input_file:org/codehaus/activemq/broker/impl/ActiveMQBrokerClientImpl.class */
public class ActiveMQBrokerClientImpl implements ActiveMQBrokerClient, ExceptionListener, PacketListener {
    private static final Log log;
    private ActiveMQBroker broker;
    private TransportChannel channel;
    private ConnectionInfo connectionInfo;
    private IdGenerator packetIdGenerator = new IdGenerator();
    private SynchronizedBoolean closed = new SynchronizedBoolean(false);
    private CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
    private CopyOnWriteArrayList producers = new CopyOnWriteArrayList();
    private CopyOnWriteArrayList transactions = new CopyOnWriteArrayList();
    private CopyOnWriteArrayList sessions = new CopyOnWriteArrayList();
    static Class class$org$codehaus$activemq$broker$impl$ActiveMQBrokerClientImpl;

    private void send(Packet packet) {
        if (this.closed.get()) {
            return;
        }
        try {
            this.channel.asyncSend(packet);
        } catch (JMSException e) {
            log.warn(new StringBuffer().append(this).append(" caught exception ").toString(), e);
            close();
        }
    }

    @Override // org.codehaus.activemq.broker.ActiveMQBrokerClient
    public void initialize(ActiveMQBroker activeMQBroker, TransportChannel transportChannel) {
        this.broker = activeMQBroker;
        this.channel = transportChannel;
        this.channel.setPacketListener(this);
        this.channel.setExceptionListener(this);
        log.trace("broker client initialized");
    }

    public void onException(JMSException jMSException) {
        log.warn(new StringBuffer().append(this).append(" caught exception ").toString(), jMSException);
        close();
    }

    public String toString() {
        return new StringBuffer().append(new StringBuffer().append("broker-client: ").append(this.connectionInfo == null ? "" : this.connectionInfo.getClientId()).toString()).append(": ").append(this.channel).toString();
    }

    private void close() {
        if (this.closed.commit(false, true)) {
            try {
                Iterator it = this.consumers.iterator();
                while (it.hasNext()) {
                    ConsumerInfo consumerInfo = (ConsumerInfo) it.next();
                    consumerInfo.setStarted(false);
                    this.broker.deregisterMessageConsumer(this, consumerInfo);
                }
                Iterator it2 = this.producers.iterator();
                while (it2.hasNext()) {
                    ProducerInfo producerInfo = (ProducerInfo) it2.next();
                    producerInfo.setStarted(false);
                    this.broker.deregisterMessageProducer(this, producerInfo);
                }
                Iterator it3 = this.sessions.iterator();
                while (it3.hasNext()) {
                    SessionInfo sessionInfo = (SessionInfo) it3.next();
                    sessionInfo.setStarted(false);
                    this.broker.deregisterSession(this, sessionInfo);
                }
                Iterator it4 = this.transactions.iterator();
                while (it4.hasNext()) {
                    this.broker.rollbackTransaction(this, it4.next().toString());
                }
                this.broker.deregisterBrokerclient(this);
                log.info(new StringBuffer().append(this).append(" has stopped").toString());
                this.consumers.clear();
                this.producers.clear();
                this.transactions.clear();
                this.sessions.clear();
            } catch (JMSException e) {
                log.warn("failed to de-register broker client", e);
            }
        }
    }

    @Override // org.codehaus.activemq.broker.ActiveMQBrokerClient
    public void dispatch(ActiveMQMessage activeMQMessage) {
        send(activeMQMessage);
    }

    @Override // org.codehaus.activemq.message.PacketListener
    public void consume(Packet packet) {
        if (this.closed.get()) {
            return;
        }
        JMSException jMSException = null;
        try {
            if (packet.isJMSMessage()) {
                ActiveMQMessage activeMQMessage = (ActiveMQMessage) packet;
                if (this.connectionInfo != null) {
                    activeMQMessage.setProducerID(this.connectionInfo.getClientId());
                } else {
                    log.warn("No connection info available!");
                }
                consumeActiveMQMessage(activeMQMessage);
            } else if (packet.getPacketType() == 15) {
                consumeMessageAck((MessageAck) packet);
            } else if (packet.getPacketType() == 19) {
                consumeTransactionInfo((TransactionInfo) packet);
            } else if (packet.getPacketType() == 17) {
                consumeConsumerInfo((ConsumerInfo) packet);
            } else if (packet.getPacketType() == 18) {
                consumeProducerInfo((ProducerInfo) packet);
            } else if (packet.getPacketType() == 22) {
                consumeSessionInfo((SessionInfo) packet);
            } else if (packet.getPacketType() == 21) {
                registerConnectionInfo((ConnectionInfo) packet);
            } else if (packet.getPacketType() == 23) {
                this.broker.durableUnsubscribe(this, (DurableUnsubscribe) packet);
            }
        } catch (JMSException e) {
            jMSException = e;
            log.info(new StringBuffer().append("caught exception consuming packet: ").append(packet).toString(), e);
        }
        if (packet.isReceiptRequired()) {
            Receipt receipt = new Receipt();
            receipt.setId(this.packetIdGenerator.generateId());
            receipt.setCorrelationId(packet.getId());
            receipt.setJmsException(jMSException);
            send(receipt);
        }
    }

    private void consumeActiveMQMessage(ActiveMQMessage activeMQMessage) throws JMSException {
        if (activeMQMessage.isPartOfTransaction()) {
            this.broker.sendTransactedMessage(this, activeMQMessage.getTransactionId(), activeMQMessage);
        } else {
            this.broker.sendMessage(this, activeMQMessage);
        }
    }

    private void consumeMessageAck(MessageAck messageAck) throws JMSException {
        if (messageAck.isPartOfTransaction()) {
            this.broker.acknowledgeTransactedMessage(this, messageAck.getTransactionId(), messageAck);
        } else {
            this.broker.acknowledgeMessage(this, messageAck);
        }
    }

    private void consumeTransactionInfo(TransactionInfo transactionInfo) throws JMSException {
        if (transactionInfo.getType() == 101) {
            this.sessions.add(transactionInfo.getTransactionId());
            this.broker.startTransaction(this, transactionInfo.getTransactionId());
            return;
        }
        if (transactionInfo.getType() == 105) {
            this.broker.rollbackTransaction(this, transactionInfo.getTransactionId());
        } else if (transactionInfo.getType() == 103) {
            this.broker.commitTransaction(this, transactionInfo.getTransactionId());
        }
        this.sessions.remove(transactionInfo.getTransactionId());
    }

    private void consumeConsumerInfo(ConsumerInfo consumerInfo) throws JMSException {
        if (consumerInfo.isStarted()) {
            this.consumers.add(consumerInfo);
            this.broker.registerMessageConsumer(this, consumerInfo);
        } else {
            this.consumers.remove(consumerInfo);
            this.broker.deregisterMessageConsumer(this, consumerInfo);
        }
    }

    private void consumeProducerInfo(ProducerInfo producerInfo) throws JMSException {
        if (producerInfo.isStarted()) {
            this.producers.add(producerInfo);
            this.broker.registerMessageProducer(this, producerInfo);
        } else {
            this.producers.remove(producerInfo);
            this.broker.deregisterMessageProducer(this, producerInfo);
        }
    }

    private void consumeSessionInfo(SessionInfo sessionInfo) throws JMSException {
        if (sessionInfo.isStarted()) {
            this.sessions.add(sessionInfo);
            this.broker.registerSession(this, sessionInfo);
        } else {
            this.sessions.remove(sessionInfo);
            this.broker.deregisterSession(this, sessionInfo);
        }
    }

    public void registerConnectionInfo(ConnectionInfo connectionInfo) throws JMSException {
        this.broker.registerBrokerclient(this, connectionInfo);
        this.connectionInfo = connectionInfo;
        log.info(new StringBuffer().append(this).append(" has started").toString());
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$codehaus$activemq$broker$impl$ActiveMQBrokerClientImpl == null) {
            cls = class$("org.codehaus.activemq.broker.impl.ActiveMQBrokerClientImpl");
            class$org$codehaus$activemq$broker$impl$ActiveMQBrokerClientImpl = cls;
        } else {
            cls = class$org$codehaus$activemq$broker$impl$ActiveMQBrokerClientImpl;
        }
        log = LogFactory.getLog(cls);
    }
}
