package net.ibizsys.central.plugin.activemq.eai;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import net.ibizsys.central.cloud.core.eai.SysEAIAgentRuntimeBase;
import net.ibizsys.runtime.SystemRuntimeException;
import net.ibizsys.runtime.util.DataTypeUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:net/ibizsys/central/plugin/activemq/eai/ActiveMQEAIAgentRuntime.class */
public class ActiveMQEAIAgentRuntime extends SysEAIAgentRuntimeBase {
    private static final Log log = LogFactory.getLog(ActiveMQEAIAgentRuntime.class);
    public static final String PARAM_TOPICMODE = "TOPICMODE";
    public static final String PARAM_ACKNOWLEDGEMODE = "ACKNOWLEDGEMODE";
    private Connection connection = null;
    private Session session = null;
    private boolean bTopicMode = false;
    private int nAcknowledgeMode = 1;
    private Map<String, MessageProducer> messageProducerMap = new ConcurrentHashMap();
    private Map<String, MessageConsumer> messageConsumerMap = new ConcurrentHashMap();

    protected void onInit() throws Exception {
        setTopicMode(getSystemRuntimeSetting().getParam(getConfigFolder() + ".topicmode", DataTypeUtils.getBooleanValue(getAgentParam(PARAM_TOPICMODE, null), false).booleanValue()));
        setAcknowledgeMode(getSystemRuntimeSetting().getParam(getConfigFolder() + ".acknowledgemode", DataTypeUtils.getIntegerValue(getAgentParam(PARAM_ACKNOWLEDGEMODE, null), 1).intValue()));
        super.onInit();
        if (getConnection(true) == null) {
            prepareConnection();
            if (getConnection(true) == null) {
                throw new Exception(String.format("连接对象无效", new Object[0]));
            }
        }
        if (getSession(true) == null) {
            prepareSession();
            if (getSession(true) == null) {
                throw new Exception(String.format("会话对象无效", new Object[0]));
            }
        }
        String syncDir = getPSSysDataSyncAgent().getSyncDir();
        if ("INOUT".equals(syncDir) || "IN".equals(syncDir)) {
            List<String> topics = getTopics();
            if (ObjectUtils.isEmpty(topics)) {
                return;
            }
            for (String str : topics) {
                getMessageConsumer(str).setMessageListener(message -> {
                    if (null == message || !(message instanceof TextMessage)) {
                        return;
                    }
                    try {
                        recv(str, ((TextMessage) message).getText());
                    } catch (JMSException e) {
                        log.error(String.format("获取消息内容发生异常，%1$s", e.getMessage()), e);
                    }
                });
            }
        }
    }

    protected Connection getConnection() {
        return getConnection(false);
    }

    protected Connection getConnection(boolean z) {
        if (this.connection != null || z) {
            return this.connection;
        }
        throw new SystemRuntimeException(getSystemRuntimeBase(), this, "未指定连接对象");
    }

    protected void setConnection(Connection connection) {
        this.connection = connection;
    }

    protected void prepareConnection() throws Exception {
        String serviceUrl = getServiceUrl();
        if (!StringUtils.hasLength(serviceUrl)) {
            throw new Exception("未指定服务地址");
        }
        Connection createConnection = new ActiveMQConnectionFactory(getClientId(), getClientSecret(), serviceUrl).createConnection();
        createConnection.start();
        setConnection(createConnection);
    }

    protected void closeConnection() throws Exception {
        Connection connection = getConnection(true);
        if (connection != null) {
            connection.close();
            setConnection(null);
        }
    }

    protected Session getSession() {
        return getSession(false);
    }

    protected Session getSession(boolean z) {
        if (this.session != null || z) {
            return this.session;
        }
        throw new SystemRuntimeException(getSystemRuntimeBase(), this, "未指定会话对象");
    }

    protected void setSession(Session session) {
        this.session = session;
    }

    protected void prepareSession() throws Exception {
        Connection connection = getConnection(true);
        if (connection == null) {
            throw new Exception("未指定服务连接");
        }
        setSession(connection.createSession(isTransacted(), getAcknowledgeMode()));
    }

    protected void closeSession() throws Exception {
        Session session = getSession(true);
        if (session != null) {
            session.close();
            setSession(null);
        }
    }

    protected MessageProducer getMessageProducer(String str) throws Exception {
        MessageProducer messageProducer = this.messageProducerMap.get(str);
        if (messageProducer != null) {
            return messageProducer;
        }
        Session session = getSession(true);
        if (session == null) {
            throw new Exception("未指定连接会话");
        }
        MessageProducer createProducer = isTopicMode() ? session.createProducer(session.createTopic(str)) : session.createProducer(session.createQueue(str));
        this.messageProducerMap.put(str, createProducer);
        return createProducer;
    }

    protected MessageConsumer getMessageConsumer(String str) throws Exception {
        MessageConsumer messageConsumer = this.messageConsumerMap.get(str);
        if (messageConsumer != null) {
            return messageConsumer;
        }
        Session session = getSession(true);
        if (session == null) {
            throw new Exception("未指定连接会话");
        }
        MessageConsumer createConsumer = isTopicMode() ? session.createConsumer(session.createTopic(str)) : session.createConsumer(session.createQueue(str));
        this.messageConsumerMap.put(str, createConsumer);
        return createConsumer;
    }

    public boolean isTopicMode() {
        return this.bTopicMode;
    }

    protected void setTopicMode(boolean z) {
        this.bTopicMode = z;
    }

    public int getAcknowledgeMode() {
        return this.nAcknowledgeMode;
    }

    public void setAcknowledgeMode(int i) {
        this.nAcknowledgeMode = i;
    }

    protected void onSend(String str, String str2) throws Throwable {
        getMessageProducer(StringUtils.hasLength(str) ? str : getDefaultTopic()).send(this.session.createTextMessage(str2));
    }

    protected void onShutdown() throws Exception {
        try {
            closeSession();
        } catch (Exception e) {
            log.error(String.format("关闭连接会话发生异常，%1$s", e.getMessage()), e);
        }
        try {
            closeConnection();
        } catch (Exception e2) {
            log.error(String.format("关闭连接发生异常，%1$s", e2.getMessage()), e2);
        }
        if (!ObjectUtils.isEmpty(this.messageProducerMap)) {
            this.messageProducerMap.clear();
        }
        if (!ObjectUtils.isEmpty(this.messageConsumerMap)) {
            this.messageConsumerMap.clear();
        }
        super.onShutdown();
    }
}
