package com.github.combinedmq.activemq;

import com.github.combinedmq.connection.ConnectionFactory;
import com.github.combinedmq.consumer.AbstractConsumer;
import com.github.combinedmq.exception.MqException;
import com.github.combinedmq.message.MessageExecutor;
import com.github.combinedmq.message.MessageListener;
import com.github.combinedmq.message.Queue;
import com.github.combinedmq.util.UnsafeUtils;
import java.util.List;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.Session;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.Message;
import org.apache.activemq.util.ByteSequence;
import sun.misc.Unsafe;

/* loaded from: input_file:com/github/combinedmq/activemq/ActiveMqConsumer.class */
public class ActiveMqConsumer extends AbstractConsumer {
    private Connection connection;
    private Unsafe unsafe;

    public ActiveMqConsumer(ConnectionFactory connectionFactory) {
        super(connectionFactory);
        this.unsafe = UnsafeUtils.getUnsafe();
    }

    @Override // com.github.combinedmq.consumer.Consumer
    public void listen() throws MqException {
        try {
            if (null == this.connection) {
                this.connection = (Connection) getConnectionFactory().getConnection().getTargetConnection();
                this.connection.start();
            }
            for (Map.Entry<Queue, List<MessageListener>> entry : getQueueListener().entrySet()) {
                Queue key = entry.getKey();
                List<MessageListener> value = entry.getValue();
                MessageExecutor messageExecutor = new MessageExecutor("activemq", value.size());
                for (MessageListener messageListener : value) {
                    if (null == messageExecutor.getMessageListener()) {
                        messageExecutor.setMessageListener(messageListener);
                    }
                    Session createSession = this.connection.createSession(Boolean.FALSE.booleanValue(), 2);
                    javax.jms.Queue queue = null;
                    switch (r0.getType()) {
                        case POINT_TO_POINT:
                            queue = createSession.createQueue(key.getQueueName());
                            break;
                        case PUBLISH_SUBSCRIBE:
                            queue = createSession.createTopic(key.getQueueName());
                            break;
                    }
                    createSession.createConsumer(queue).setMessageListener(message -> {
                        ActiveMQBytesMessage activeMQBytesMessage = (ActiveMQBytesMessage) message;
                        try {
                            try {
                                try {
                                    messageExecutor.onMessage(new ActiveMqMessage(((ByteSequence) this.unsafe.getObject(activeMQBytesMessage, this.unsafe.objectFieldOffset(Message.class.getDeclaredField("content")))).getData())).get();
                                    activeMQBytesMessage.acknowledge();
                                } catch (Throwable th) {
                                    th.printStackTrace();
                                    activeMQBytesMessage.acknowledge();
                                }
                            } catch (Throwable th2) {
                                activeMQBytesMessage.acknowledge();
                                throw th2;
                            }
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    });
                }
            }
        } catch (Exception e) {
            throw new MqException(e);
        }
    }
}
