package cn.amossun.starter.event.listener;

import cn.amossun.starter.event.common.Constants;
import cn.amossun.starter.event.exception.EventMethodExecuteException;
import cn.amossun.starter.event.exception.EventMethodNodFoundException;
import cn.amossun.starter.event.property.EventContext;
import cn.amossun.starter.event.property.SimpleEventMessage;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

/* loaded from: input_file:cn/amossun/starter/event/listener/SimpleRemoteEventMessageListener.class */
public class SimpleRemoteEventMessageListener implements RemoteEventMessageListener<SimpleEventMessage>, ChannelAwareMessageListener {
    private static final Logger log = LoggerFactory.getLogger(SimpleRemoteEventMessageListener.class);
    private final ThreadLocal<AcknowledgeMode> threadLocal = new ThreadLocal<>();
    private EventContext eventContext;

    public SimpleRemoteEventMessageListener(EventContext eventContext) {
        this.eventContext = eventContext;
    }

    public String[] getListeningQueues() {
        return (String[]) this.eventContext.getQueueContexts().stream().map(eventQueueContext -> {
            return eventQueueContext.getQueueName();
        }).toArray(i -> {
            return new String[i];
        });
    }

    public void onMessage(Message message, Channel channel) {
        String consumerQueue = message.getMessageProperties().getConsumerQueue();
        SimpleEventMessage simpleEventMessage = (SimpleEventMessage) JSON.parseObject(new String(message.getBody()), SimpleEventMessage.class);
        log.info("接收到来自队列: [{}] => {}", consumerQueue, simpleEventMessage);
        Boolean bool = false;
        Boolean bool2 = false;
        try {
            onRemoteEvent(consumerQueue, simpleEventMessage);
        } catch (EventMethodExecuteException e) {
            log.warn("调用监听方法异常, 拒绝消费信息");
            bool = true;
            bool2 = Boolean.valueOf(e.isReTry());
        } catch (EventMethodNodFoundException e2) {
            log.warn("未找到监听方法, 拒绝消费信息");
            bool = true;
        } catch (Exception e3) {
            log.error("调用监听方法异常, 拒绝消费信息: ", e3);
            bool = true;
        }
        try {
            try {
                AcknowledgeMode acknowledgeMode = this.threadLocal.get();
                if (acknowledgeMode == null || acknowledgeMode.isAutoAck()) {
                    if (this.threadLocal.get() != null) {
                        this.threadLocal.remove();
                        return;
                    }
                    return;
                }
                if (!acknowledgeMode.isManual()) {
                    if (bool.booleanValue() && acknowledgeMode == AcknowledgeMode.AUTO) {
                        throw new EventMethodExecuteException("执行调用监听方法异常");
                    }
                    if (this.threadLocal.get() != null) {
                        this.threadLocal.remove();
                        return;
                    }
                    return;
                }
                if (bool.booleanValue()) {
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, bool2.booleanValue());
                    if (this.threadLocal.get() != null) {
                        this.threadLocal.remove();
                        return;
                    }
                    return;
                }
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                if (this.threadLocal.get() != null) {
                    this.threadLocal.remove();
                }
            } catch (IOException e4) {
                log.error("消费消息basicAck/basicNack异常: ", e4);
                if (this.threadLocal.get() != null) {
                    this.threadLocal.remove();
                }
            }
        } catch (Throwable th) {
            if (this.threadLocal.get() != null) {
                this.threadLocal.remove();
            }
            throw th;
        }
    }

    @Override // cn.amossun.starter.event.listener.RemoteEventMessageListener
    public void onRemoteEvent(String str, SimpleEventMessage simpleEventMessage) {
        log.info("开始执行对应监听方法");
        try {
            String format = String.format("%s.%s", simpleEventMessage.getTopic(), simpleEventMessage.getDefinition());
            String format2 = String.format("%s.%s", simpleEventMessage.getTopic(), Constants.ROUTING_KEY_SUFFIX);
            List list = (List) Optional.ofNullable(this.eventContext.getInvokeHandlers(format)).orElseGet(() -> {
                return new ArrayList();
            });
            List list2 = (List) Optional.ofNullable(this.eventContext.getInvokeHandlers(format2)).orElseGet(() -> {
                return new ArrayList();
            });
            HashSet hashSet = new HashSet(list.size() + list2.size());
            hashSet.addAll(list);
            hashSet.addAll(list2);
            if (CollectionUtil.isEmpty(hashSet)) {
                throw new EventMethodNodFoundException("监听方法未找到");
            }
            Map map = (Map) hashSet.stream().collect(Collectors.groupingBy((v0) -> {
                return v0.getQueueNames();
            }));
            AtomicReference atomicReference = null;
            map.entrySet().stream().forEach(entry -> {
                if (((List) entry.getKey()).contains(str)) {
                    atomicReference.set(entry.getValue());
                }
            });
            if (CollectionUtil.isEmpty((Collection) atomicReference.get())) {
                throw new EventMethodNodFoundException("队列消费者未找到");
            }
            ((List) atomicReference.get()).stream().forEach(listenerInvokeHandler -> {
                try {
                    this.threadLocal.set(listenerInvokeHandler.getListenerContext().getAckMode());
                    listenerInvokeHandler.getMethod().invoke(listenerInvokeHandler.getBean(), JSON.parseObject(simpleEventMessage.getMessage(), listenerInvokeHandler.getListenerContext().getSchemaClass()));
                } catch (Exception e) {
                    log.error("执行调用监听方法异常: ", e);
                    throw new EventMethodExecuteException(e.getMessage(), listenerInvokeHandler.getListenerContext().isReTry());
                }
            });
        } catch (EventMethodExecuteException e) {
            throw e;
        } catch (Exception e2) {
            throw new EventMethodExecuteException(e2.getMessage());
        }
    }
}
