package bee.cloud.rocketmq;

import bee.cloud.config.BConfig;
import bee.cloud.ri.mq.Body;
import bee.cloud.ri.mq.Clients;
import bee.cloud.ri.mq.Consumer;
import bee.cloud.ri.mq.MsgBody;
import bee.tool.Tool;
import bee.tool.string.Format;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:bee/cloud/rocketmq/RocketConsumer.class */
public class RocketConsumer {
    private Object lock = new Object();
    private Map<String, MQPushConsumer> consumers = new HashMap();
    private Set<String> init = new HashSet();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v7, types: [java.lang.Throwable, java.lang.Object] */
    public synchronized void init(final Body body) {
        MQPushConsumer defaultMQPushConsumer;
        String asText;
        if (this.init.contains(body.getGroupId())) {
            return;
        }
        this.init.add(body.getGroupId());
        synchronized (this.lock) {
            try {
                defaultMQPushConsumer = new DefaultMQPushConsumer(body.getGroupId());
                asText = BConfig.asText("mq.massege.host");
            } catch (MQClientException e) {
                this.init.remove(body.getGroupId());
            }
            if (Format.isEmpty(asText)) {
                Tool.Log.info("未配置消息服务器，请在Bee配置文件中配置消息服务器IP地址与端口：{}。", new Object[]{"mq.massege.host"});
                return;
            }
            defaultMQPushConsumer.setNamesrvAddr(asText);
            defaultMQPushConsumer.subscribe(body.getTopic(), Format.isEmpty(body.getTag()) ? "*" : body.getTag());
            defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() { // from class: bee.cloud.rocketmq.RocketConsumer.1
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                    ArrayList arrayList = new ArrayList();
                    for (MessageExt messageExt : list) {
                        MsgBody msgBody = new MsgBody();
                        msgBody.setFlag(messageExt.getFlag());
                        msgBody.setMsgId(messageExt.getMsgId());
                        msgBody.setProperties(messageExt.getProperties());
                        msgBody.setTransactionId(messageExt.getTransactionId());
                        msgBody.setGroupId(msgBody.getGroupId());
                        msgBody.setTopic(msgBody.getTopic());
                        msgBody.setTag(msgBody.getTag());
                        msgBody.setBody(messageExt.getBody());
                        MsgBody.Ext ext = msgBody.getExt();
                        ext.setBodyCRC(messageExt.getBodyCRC());
                        ext.setBornHost(messageExt.getBornHost());
                        ext.setBornTimestamp(messageExt.getBornTimestamp());
                        ext.setCommitLogOffset(messageExt.getCommitLogOffset());
                        ext.setPreparedTransactionOffset(messageExt.getPreparedTransactionOffset());
                        ext.setQueueId(messageExt.getQueueId());
                        ext.setQueueOffset(messageExt.getQueueOffset());
                        ext.setReconsumeTimes(messageExt.getReconsumeTimes());
                        ext.setStoreHost(messageExt.getStoreHost());
                        ext.setStoreSize(messageExt.getStoreSize());
                        ext.setStoreTimestamp(messageExt.getStoreTimestamp());
                        ext.setSysFlag(messageExt.getSysFlag());
                        arrayList.add(msgBody);
                    }
                    Consumer consumer = Clients.getConsumer(body.getGroupId(), body.getTopic(), body.getTag());
                    if (arrayList.size() == 1) {
                        consumer.message((MsgBody) arrayList.get(0));
                    } else {
                        consumer.message(arrayList);
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
            defaultMQPushConsumer.start();
            this.consumers.put(body.getGroupId(), defaultMQPushConsumer);
        }
    }
}
