package io.jboot.core.mq.qpidmq;

import com.jfinal.log.Log;
import io.jboot.Jboot;
import io.jboot.core.mq.Jbootmq;
import io.jboot.core.mq.JbootmqBase;
import io.jboot.exception.JbootException;
import io.jboot.utils.ArrayUtils;
import io.jboot.utils.StringUtils;
import io.jboot.web.session.JbootSessionConfig;
import javax.jms.BytesMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.jms.Connection;

/* loaded from: input_file:io/jboot/core/mq/qpidmq/JbootQpidmqImpl.class */
public class JbootQpidmqImpl extends JbootmqBase implements Jbootmq {
    private static final Log LOG = Log.getLog(JbootQpidmqImpl.class);
    private Connection connection;
    private boolean serializerEnable;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/jboot/core/mq/qpidmq/JbootQpidmqImpl$ReceiveMsgThread.class */
    public class ReceiveMsgThread implements Runnable {
        private MessageConsumer consumer;
        private String channel;
        private boolean serializerEnable;

        public ReceiveMsgThread(MessageConsumer messageConsumer, String str, boolean z) {
            this.consumer = messageConsumer;
            this.channel = str;
            this.serializerEnable = z;
        }

        @Override // java.lang.Runnable
        public void run() {
            Object text;
            while (true) {
                try {
                    BytesMessage receive = this.consumer.receive();
                    if (receive != null) {
                        if (this.serializerEnable) {
                            BytesMessage bytesMessage = receive;
                            int intProperty = bytesMessage.getIntProperty("data-len");
                            byte[] bArr = new byte[intProperty];
                            if (intProperty == bytesMessage.readBytes(bArr)) {
                                text = Jboot.me().getSerializer().deserialize(bArr);
                            }
                        } else {
                            text = ((TextMessage) receive).getText();
                        }
                        if (text != null) {
                            JbootQpidmqImpl.this.notifyListeners(this.channel, text);
                        }
                    }
                } catch (Exception e) {
                    JbootQpidmqImpl.LOG.error(e.toString(), e);
                    return;
                }
            }
        }
    }

    public JbootQpidmqImpl() {
        this.connection = null;
        this.serializerEnable = true;
        this.serializerEnable = ((JbootQpidmqConfig) Jboot.config(JbootQpidmqConfig.class)).isSerializerEnable();
        try {
            this.connection = new AMQConnection(getConnectionUrl());
            this.connection.start();
            startReceiveMsgThread();
        } catch (Exception e) {
            throw new JbootException("can not connection qpidmq server", e);
        }
    }

    @Override // io.jboot.core.mq.Jbootmq
    public void enqueue(Object obj, String str) {
        sendMsg(getQueueAddr(str), obj);
    }

    @Override // io.jboot.core.mq.Jbootmq
    public void publish(Object obj, String str) {
        sendMsg(getTopicAddr(str), obj);
    }

    public void sendMsg(String str, Object obj) {
        Message createBytesMessage;
        try {
            Session createSession = this.connection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(new AMQAnyDestination(str.toString()));
            createProducer.setTimeToLive(30000L);
            if (this.serializerEnable) {
                byte[] serialize = Jboot.me().getSerializer().serialize(obj);
                createBytesMessage = createSession.createBytesMessage();
                createBytesMessage.setIntProperty("data-len", serialize.length);
                ((BytesMessage) createBytesMessage).writeBytes(serialize);
            } else {
                createBytesMessage = createSession.createTextMessage((String) obj);
            }
            createProducer.send(createBytesMessage);
        } catch (Exception e) {
            LOG.error(e.toString(), e);
        }
    }

    private String getConnectionUrl() {
        JbootQpidmqConfig jbootQpidmqConfig = (JbootQpidmqConfig) Jboot.config(JbootQpidmqConfig.class);
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append("amqp://");
        stringBuffer.append(jbootQpidmqConfig.getUsername());
        stringBuffer.append(":");
        stringBuffer.append(jbootQpidmqConfig.getPassword());
        stringBuffer.append("@");
        stringBuffer.append(JbootSessionConfig.DEFAULT_COOKIE_CONTEXT_PATH);
        stringBuffer.append(jbootQpidmqConfig.getVirtualHost());
        stringBuffer.append("?failover='roundrobin'");
        stringBuffer.append("&brokerlist='");
        for (String str : jbootQpidmqConfig.getHost().split(",")) {
            if (!StringUtils.isBlank(str)) {
                stringBuffer.append("tcp://" + str + ";");
            }
        }
        stringBuffer.append("'");
        return stringBuffer.toString();
    }

    private String getQueueAddr(String str) {
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append("ADDR:");
        stringBuffer.append(str);
        stringBuffer.append(";{create:always}");
        return stringBuffer.toString();
    }

    private String getTopicAddr(String str) {
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append("ADDR:amq.topic/");
        stringBuffer.append(str);
        return stringBuffer.toString();
    }

    private void startReceiveMsgThread() throws Exception {
        if (ArrayUtils.isNullOrEmpty(this.channels)) {
            return;
        }
        for (String str : this.channels) {
            Session createSession = this.connection.createSession(false, 1);
            new Thread(new ReceiveMsgThread(createSession.createConsumer(new AMQAnyDestination(getQueueAddr(str))), str, this.serializerEnable)).start();
            new Thread(new ReceiveMsgThread(createSession.createConsumer(new AMQAnyDestination(getTopicAddr(str))), str, this.serializerEnable)).start();
        }
    }
}
