package bee.cloud.rocketmq;

import bee.cloud.config.BConfig;
import bee.cloud.config.BSystem;
import bee.cloud.ri.mq.DataBody;
import bee.cloud.ri.mq.DataBodys;
import bee.tool.Tool;
import bee.tool.string.Format;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:bee/cloud/rocketmq/RocketProducer.class */
public class RocketProducer {
    private Object lock = new Object();
    private Map<String, MQProducer> producers = new HashMap();
    private Set<String> init = new HashSet();

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v15, types: [org.apache.rocketmq.client.producer.DefaultMQProducer] */
    /* JADX WARN: Type inference failed for: r0v22, types: [java.lang.String] */
    /* JADX WARN: Type inference failed for: r0v7, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v8, types: [java.lang.Throwable] */
    private synchronized void initDefault(String str) {
        if (this.init.contains(str)) {
            return;
        }
        this.init.add(str);
        ?? r0 = this.lock;
        synchronized (r0) {
            MQProducer defaultMQProducer = new DefaultMQProducer(str);
            String single = Tool.Value.toSingle(new String[]{BSystem.get("mq.host"), BConfig.asText("mq.massege.host")});
            if (Format.isEmpty(single)) {
                Tool.Log.info("未配置消息服务器，请在Bee配置文件中配置消息服务器IP地址与端口：{}。", new Object[]{"mq.massege.host"});
                return;
            }
            r0 = defaultMQProducer;
            r0.setNamesrvAddr(single);
            try {
                defaultMQProducer.start();
                this.producers.put(str, defaultMQProducer);
                r0 = "[{}]注册生产者{}成功！";
                Tool.Log.info("[{}]注册生产者{}成功！", new Object[]{single, str});
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v15, types: [org.apache.rocketmq.client.producer.TransactionMQProducer] */
    /* JADX WARN: Type inference failed for: r0v25, types: [java.lang.String] */
    /* JADX WARN: Type inference failed for: r0v7, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v8, types: [java.lang.Throwable] */
    private synchronized void init(String str) {
        if (this.init.contains(str)) {
            return;
        }
        this.init.add(str);
        ?? r0 = this.lock;
        synchronized (r0) {
            MQProducer transactionMQProducer = new TransactionMQProducer(str);
            String single = Tool.Value.toSingle(new String[]{BSystem.get("mq.host"), BConfig.asText("mq.massege.host")});
            if (Format.isEmpty(single)) {
                Tool.Log.info("未配置消息服务器，请在Bee配置文件中配置消息服务器IP地址与端口：{}。", new Object[]{"mq.massege.host"});
                return;
            }
            r0 = transactionMQProducer;
            r0.setNamesrvAddr(single);
            try {
                transactionMQProducer.setTransactionListener(new TransactionListener() { // from class: bee.cloud.rocketmq.RocketProducer.1
                    long start;

                    public LocalTransactionState executeLocalTransaction(Message message, Object obj) {
                        this.start = System.currentTimeMillis();
                        return LocalTransactionState.UNKNOW;
                    }

                    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                        Tool.Log.info("事务补偿过程执行了，time={}", new Object[]{Long.valueOf((System.currentTimeMillis() - this.start) / 1000)});
                        return LocalTransactionState.COMMIT_MESSAGE;
                    }
                });
                this.producers.put(str, transactionMQProducer);
                transactionMQProducer.start();
                r0 = "[{}]注册生产者{}成功！";
                Tool.Log.info("[{}]注册生产者{}成功！", new Object[]{single, str});
            } catch (MQClientException e) {
                this.init.remove(str);
            }
        }
    }

    public SendResult send(DataBodys dataBodys) {
        if (!this.producers.containsKey(dataBodys.getGroupId())) {
            init(dataBodys.getGroupId());
        }
        if (!this.producers.containsKey(dataBodys.getGroupId())) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        dataBodys.getBody().forEach(bArr -> {
            arrayList.add(new Message(dataBodys.getTopic(), dataBodys.getTag(), bArr));
        });
        try {
            return this.producers.get(dataBodys.getGroupId()).send(arrayList);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    public SendResult send(DataBody dataBody) {
        if (!this.producers.containsKey(dataBody.getGroupId())) {
            init(dataBody.getGroupId());
        }
        if (!this.producers.containsKey(dataBody.getGroupId())) {
            return null;
        }
        try {
            return this.producers.get(dataBody.getGroupId()).send(new Message(dataBody.getTopic(), dataBody.getTag(), dataBody.getBody()));
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
}
