package me.insidezhou.southernquiet.job.driver;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import me.insidezhou.southernquiet.amqp.rabbit.AmqpAutoConfiguration;
import me.insidezhou.southernquiet.amqp.rabbit.AmqpMessageRecover;
import me.insidezhou.southernquiet.amqp.rabbit.DirectRabbitListenerContainerFactoryConfigurer;
import me.insidezhou.southernquiet.job.AmqpJobAutoConfiguration;
import me.insidezhou.southernquiet.job.JobProcessor;
import me.insidezhou.southernquiet.util.Amplifier;
import me.insidezhou.southernquiet.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerEndpoint;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.converter.SmartMessageConverter;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.Lifecycle;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:me/insidezhou/southernquiet/job/driver/AmqpJobProcessorManager.class */
public class AmqpJobProcessorManager extends AbstractJobProcessorManager implements Lifecycle, RabbitListenerConfigurer {
    private static final Logger log = LoggerFactory.getLogger(AmqpJobProcessorManager.class);
    private final SmartMessageConverter messageConverter;
    private final ConnectionFactory connectionFactory;
    private final List<Tuple<RabbitListenerEndpoint, JobProcessor, String>> listenerEndpoints;
    private final AmqpAutoConfiguration.Properties amqpProperties;
    private final AmqpJobAutoConfiguration.Properties amqpJobProperties;
    private final Amplifier amplifier;
    private final RabbitProperties rabbitProperties;
    private final RabbitAdmin rabbitAdmin;
    private final RabbitTemplate rabbitTemplate;

    public AmqpJobProcessorManager(RabbitAdmin rabbitAdmin, AmqpJobArranger<?> amqpJobArranger, Amplifier amplifier, AmqpJobAutoConfiguration.Properties properties, AmqpAutoConfiguration.Properties properties2, RabbitTransactionManager rabbitTransactionManager, RabbitProperties rabbitProperties, ApplicationContext applicationContext) {
        super(applicationContext);
        this.listenerEndpoints = new ArrayList();
        this.amplifier = amplifier;
        this.rabbitAdmin = rabbitAdmin;
        this.amqpJobProperties = properties;
        this.amqpProperties = properties2;
        this.rabbitProperties = rabbitProperties;
        this.messageConverter = amqpJobArranger.getMessageConverter();
        this.connectionFactory = rabbitTransactionManager.getConnectionFactory();
        this.rabbitTemplate = new RabbitTemplate(this.connectionFactory);
    }

    public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
        this.listenerEndpoints.forEach(tuple -> {
            RabbitListenerEndpoint rabbitListenerEndpoint = (RabbitListenerEndpoint) tuple.getFirst();
            JobProcessor jobProcessor = (JobProcessor) tuple.getSecond();
            String str = (String) tuple.getThird();
            Amplifier amplifier = this.amplifier;
            if (!StringUtils.isEmpty(jobProcessor.amplifierBeanName())) {
                amplifier = (Amplifier) this.applicationContext.getBean(jobProcessor.amplifierBeanName(), Amplifier.class);
            }
            DirectRabbitListenerContainerFactoryConfigurer directRabbitListenerContainerFactoryConfigurer = new DirectRabbitListenerContainerFactoryConfigurer(this.rabbitProperties, new AmqpMessageRecover(this.rabbitTemplate, amplifier, "", getDeadRouting(jobProcessor, str), this.amqpProperties), this.amqpProperties);
            DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory = new DirectRabbitListenerContainerFactory();
            directRabbitListenerContainerFactory.setMessageConverter(this.messageConverter);
            directRabbitListenerContainerFactory.setAcknowledgeMode(this.amqpProperties.getAcknowledgeMode());
            directRabbitListenerContainerFactoryConfigurer.configure(directRabbitListenerContainerFactory, this.connectionFactory);
            rabbitListenerEndpointRegistrar.registerEndpoint(rabbitListenerEndpoint, directRabbitListenerContainerFactory);
        });
    }

    protected void initProcessor(JobProcessor jobProcessor, Object obj, Method method) {
        String name = method.getName();
        String listenerName = getListenerName(jobProcessor, name);
        String listenerRouting = getListenerRouting(jobProcessor, name);
        this.listenerEndpoints.stream().filter(tuple -> {
            return jobProcessor.job() == ((JobProcessor) tuple.getSecond()).job() && listenerName.equals(tuple.getThird());
        }).findAny().ifPresent(tuple2 -> {
            log.warn("监听器重复: queue={}, listener={}#{}, job={}", new Object[]{listenerRouting, obj.getClass().getName(), name, jobProcessor.job().getSimpleName()});
        });
        SimpleRabbitListenerEndpoint simpleRabbitListenerEndpoint = new SimpleRabbitListenerEndpoint();
        simpleRabbitListenerEndpoint.setId(UUID.randomUUID().toString());
        simpleRabbitListenerEndpoint.setQueueNames(new String[]{listenerRouting});
        simpleRabbitListenerEndpoint.setAdmin(this.rabbitAdmin);
        declareExchangeAndQueue(jobProcessor, name);
        Class job = jobProcessor.job();
        ParameterizedTypeReference forType = ParameterizedTypeReference.forType(job);
        simpleRabbitListenerEndpoint.setMessageListener(message -> {
            Object fromMessage = this.messageConverter.fromMessage(message, forType);
            if (log.isDebugEnabled()) {
                log.debug("监听器收到通知: queue={}, listener={}#{}({}), job={}, message={}", new Object[]{simpleRabbitListenerEndpoint.getQueueNames(), obj.getClass().getName(), name, simpleRabbitListenerEndpoint.getId(), fromMessage.getClass().getSimpleName(), message});
            }
            try {
                method.invoke(obj, Arrays.stream(method.getParameters()).map(parameter -> {
                    Class<?> type = parameter.getType();
                    if (type.isInstance(fromMessage)) {
                        return fromMessage;
                    }
                    if (type.isInstance(jobProcessor)) {
                        return jobProcessor;
                    }
                    log.warn("不支持在任务监听器中使用此类型的参数\tparameter={}, job={}", parameter.getClass(), job);
                    try {
                        return type.newInstance();
                    } catch (Exception e) {
                        return null;
                    }
                }).toArray());
            } catch (RuntimeException e) {
                log.error("任务处理器抛出异常", e);
                throw e;
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        });
        this.listenerEndpoints.add(new Tuple<>(simpleRabbitListenerEndpoint, jobProcessor, listenerName));
    }

    private String getDeadSource(JobProcessor jobProcessor, String str) {
        return suffix("DEAD." + AmqpJobArranger.getQueueSource(jobProcessor.job()), jobProcessor, str);
    }

    private String getDeadRouting(JobProcessor jobProcessor, String str) {
        return AmqpJobArranger.getRouting(this.amqpJobProperties.getNamePrefix(), getDeadSource(jobProcessor, str));
    }

    private String getListenerRouting(JobProcessor jobProcessor, String str) {
        return suffix(AmqpJobArranger.getRouting(this.amqpJobProperties.getNamePrefix(), (Class<?>) jobProcessor.job()), jobProcessor, str);
    }

    private String getListenerName(JobProcessor jobProcessor, String str) {
        String name = jobProcessor.name();
        if (StringUtils.isEmpty(name)) {
            name = str;
        }
        Assert.hasText(name, "处理器的名称不能为空");
        return name;
    }

    private String suffix(String str, JobProcessor jobProcessor, String str2) {
        return str + "#" + getListenerName(jobProcessor, str2);
    }

    private void declareExchangeAndQueue(JobProcessor jobProcessor, String str) {
        String listenerRouting = getListenerRouting(jobProcessor, str);
        FanoutExchange fanoutExchange = new FanoutExchange(AmqpJobArranger.getExchange(this.amqpJobProperties.getNamePrefix(), (Class<?>) jobProcessor.job()));
        Queue queue = new Queue(listenerRouting);
        this.rabbitAdmin.declareExchange(fanoutExchange);
        this.rabbitAdmin.declareQueue(queue);
        this.rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange).with(listenerRouting).noargs());
        HashMap hashMap = new HashMap();
        hashMap.put("x-dead-letter-exchange", "");
        hashMap.put("x-dead-letter-routing-key", queue.getName());
        this.rabbitAdmin.declareQueue(new Queue(getDeadRouting(jobProcessor, str), true, false, false, hashMap));
    }

    public void start() {
        this.rabbitTemplate.start();
    }

    public void stop() {
        this.rabbitTemplate.stop();
    }

    public boolean isRunning() {
        return this.rabbitTemplate.isRunning();
    }

    public RabbitTemplate getRabbitTemplate() {
        return this.rabbitTemplate;
    }
}
