package amqp.spring.camel.component;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;
import org.aopalliance.aop.Advice;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultExchange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpConnectException;
import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.retry.MessageKeyGenerator;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.retry.policy.NeverRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.ErrorHandler;

/* loaded from: input_file:amqp/spring/camel/component/SpringAMQPConsumer.class */
public class SpringAMQPConsumer extends DefaultConsumer implements ConnectionListener {
    private static final transient Logger LOG = LoggerFactory.getLogger(SpringAMQPConsumer.class);
    private static final String TTL_QUEUE_ARGUMENT = "x-message-ttl";
    private static final String HA_POLICY_ARGUMENT = "x-ha-policy";
    protected SpringAMQPEndpoint endpoint;
    private RabbitMQMessageListener messageListener;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:amqp/spring/camel/component/SpringAMQPConsumer$DefaultKeyGenerator.class */
    public static class DefaultKeyGenerator implements MessageKeyGenerator {
        public static final String ALGORITHM = "MD5";

        DefaultKeyGenerator() {
        }

        public Object getKey(Message message) {
            try {
                MessageDigest messageDigest = MessageDigest.getInstance(ALGORITHM);
                messageDigest.update(message.getBody());
                return String.valueOf(messageDigest.digest());
            } catch (NoSuchAlgorithmException e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* loaded from: input_file:amqp/spring/camel/component/SpringAMQPConsumer$RabbitMQMessageListener.class */
    class RabbitMQMessageListener implements MessageListener {
        private MessageConverter msgConverter;
        private SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        private static final long DEFAULT_TIMEOUT_MILLIS = 1000;

        public RabbitMQMessageListener(SpringAMQPEndpoint springAMQPEndpoint) {
            this.listenerContainer.setTaskExecutor(new SpringAMQPExecutor(springAMQPEndpoint));
            RabbitTemplate amqpTemplate = springAMQPEndpoint.getAmqpTemplate();
            if (amqpTemplate != null) {
                this.msgConverter = amqpTemplate.getMessageConverter();
                this.listenerContainer.setConnectionFactory(amqpTemplate.getConnectionFactory());
            } else {
                SpringAMQPConsumer.LOG.error("No AMQP Template found! Cannot initialize message conversion or connections!");
            }
            this.listenerContainer.setQueueNames(new String[]{springAMQPEndpoint.getQueueName()});
            this.listenerContainer.setConcurrentConsumers(springAMQPEndpoint.getConcurrentConsumers());
            this.listenerContainer.setPrefetchCount(springAMQPEndpoint.getPrefetchCount());
            this.listenerContainer.setErrorHandler(getErrorHandler());
            this.listenerContainer.setAdviceChain(getAdviceChain());
            this.listenerContainer.setShutdownTimeout(DEFAULT_TIMEOUT_MILLIS);
            this.listenerContainer.setReceiveTimeout(DEFAULT_TIMEOUT_MILLIS);
            this.listenerContainer.setRecoveryInterval(500L);
            this.listenerContainer.setChannelTransacted(false);
            this.listenerContainer.setAcknowledgeMode(AcknowledgeMode.NONE);
        }

        public void start() {
            this.listenerContainer.setMessageListener(this);
            this.listenerContainer.start();
            SpringAMQPConsumer.LOG.info("Started AMQP Async Listeners for {}", SpringAMQPConsumer.this.endpoint.getEndpointUri());
        }

        public void stop() {
            this.listenerContainer.setConcurrentConsumers(0);
            this.listenerContainer.setPrefetchCount(0);
            this.listenerContainer.stop();
        }

        public void shutdown() {
            this.listenerContainer.shutdown();
            this.listenerContainer.destroy();
        }

        public final ErrorHandler getErrorHandler() {
            return new ErrorHandler() { // from class: amqp.spring.camel.component.SpringAMQPConsumer.RabbitMQMessageListener.1
                public void handleError(Throwable th) {
                    if (th instanceof AmqpConnectException) {
                        SpringAMQPConsumer.LOG.error("AMQP Connection error, marking this connection as failed");
                        SpringAMQPConsumer.this.onClose(null);
                    }
                    SpringAMQPConsumer.this.getExceptionHandler().handleException(th);
                }
            };
        }

        public final Advice[] getAdviceChain() {
            RetryTemplate retryTemplate = new RetryTemplate();
            retryTemplate.setRetryPolicy(new NeverRetryPolicy());
            StatefulRetryOperationsInterceptorFactoryBean statefulRetryOperationsInterceptorFactoryBean = new StatefulRetryOperationsInterceptorFactoryBean();
            statefulRetryOperationsInterceptorFactoryBean.setRetryOperations(retryTemplate);
            statefulRetryOperationsInterceptorFactoryBean.setMessageKeyGeneretor(new DefaultKeyGenerator());
            return new Advice[]{statefulRetryOperationsInterceptorFactoryBean.getObject()};
        }

        public void onMessage(Message message) {
            if (this.msgConverter == null) {
                throw new IllegalStateException("No message converter present - cannot processs messages!");
            }
            SpringAMQPConsumer.LOG.debug("Received message for routing key {}", message.getMessageProperties().getReceivedRoutingKey());
            DefaultExchange defaultExchange = new DefaultExchange(SpringAMQPConsumer.this.endpoint, SpringAMQPMessage.getExchangePattern(message));
            defaultExchange.setIn(SpringAMQPMessage.fromAMQPMessage(this.msgConverter, message));
            try {
                SpringAMQPConsumer.this.getProcessor().process(defaultExchange);
            } catch (Throwable th) {
                defaultExchange.setException(th);
            }
            Address replyToAddress = message.getMessageProperties().getReplyToAddress();
            if (replyToAddress != null) {
                SpringAMQPMessage springAMQPMessage = new SpringAMQPMessage(defaultExchange.getOut());
                defaultExchange.setOut(springAMQPMessage);
                try {
                    SpringAMQPConsumer.this.endpoint.getAmqpTemplate().send(replyToAddress.getExchangeName(), replyToAddress.getRoutingKey(), springAMQPMessage.toAMQPMessage(this.msgConverter));
                } catch (AmqpConnectException e) {
                    SpringAMQPConsumer.LOG.error("AMQP Connection error, marking this connection as failed");
                    SpringAMQPConsumer.this.onClose(null);
                }
            }
        }
    }

    /* loaded from: input_file:amqp/spring/camel/component/SpringAMQPConsumer$SpringAMQPExecutor.class */
    class SpringAMQPExecutor extends SimpleAsyncTaskExecutor {
        private SpringAMQPEndpoint endpoint;

        SpringAMQPExecutor(SpringAMQPEndpoint springAMQPEndpoint) {
            this.endpoint = springAMQPEndpoint;
        }

        public void execute(Runnable runnable) {
            super.execute(new SpringAMQPExecutorTask(this.endpoint, runnable));
        }

        public void execute(Runnable runnable, long j) {
            super.execute(new SpringAMQPExecutorTask(this.endpoint, runnable), j);
        }
    }

    /* loaded from: input_file:amqp/spring/camel/component/SpringAMQPConsumer$SpringAMQPExecutorTask.class */
    class SpringAMQPExecutorTask implements Runnable {
        private SpringAMQPEndpoint endpoint;
        private Runnable delegateTask;
        public static final long RECOVERY_INTERVAL_MILLISECONDS = 30000;

        public SpringAMQPExecutorTask(SpringAMQPEndpoint springAMQPEndpoint, Runnable runnable) {
            this.endpoint = springAMQPEndpoint;
            this.delegateTask = runnable;
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z;
            do {
                try {
                    z = false;
                    declareAMQPEntities();
                    this.delegateTask.run();
                } catch (Exception e) {
                    z = true;
                    SpringAMQPConsumer.LOG.error("Error consuming endpoint " + this.endpoint + ". " + e.getMessage(), e);
                    try {
                        Thread.sleep(RECOVERY_INTERVAL_MILLISECONDS);
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                        throw new IllegalStateException("Unrecoverable interruption on consumer restart");
                    }
                }
            } while (z);
        }

        protected void declareAMQPEntities() {
            declareBinding(declareExchange(), declareQueue());
        }

        protected Exchange declareExchange() {
            Exchange createAMQPExchange = this.endpoint.createAMQPExchange();
            if (this.endpoint.isUsingDefaultExchange()) {
                SpringAMQPConsumer.LOG.info("Using default exchange; will not declare one for endpoint {}.", this.endpoint);
            } else {
                try {
                    this.endpoint.amqpAdministration.declareExchange(createAMQPExchange);
                    SpringAMQPConsumer.LOG.info("Declared exchange {} for endpoint {}.", createAMQPExchange.getName(), this.endpoint);
                } catch (AmqpConnectException e) {
                    SpringAMQPConsumer.LOG.error(String.format("Consumer cannot connect to broker for endpoint %s", this.endpoint.toString()), e);
                    throw e;
                } catch (AmqpIOException e2) {
                    SpringAMQPConsumer.LOG.warn(String.format("Could not declare exchange %s for endpoint %s; possible re-declaration of a different type?", createAMQPExchange.getName(), this.endpoint.toString()), e2);
                }
            }
            return createAMQPExchange;
        }

        protected Queue declareQueue() {
            HashMap hashMap = new HashMap();
            if (this.endpoint.getTimeToLive() != null) {
                hashMap.put(SpringAMQPConsumer.TTL_QUEUE_ARGUMENT, this.endpoint.getTimeToLive());
            }
            if (this.endpoint.isHa()) {
                hashMap.put(SpringAMQPConsumer.HA_POLICY_ARGUMENT, "all");
            }
            Queue queue = new Queue(this.endpoint.queueName, this.endpoint.durable, this.endpoint.exclusive, this.endpoint.autodelete, hashMap);
            this.endpoint.getAmqpAdministration().declareQueue(queue);
            SpringAMQPConsumer.LOG.info("Declared queue {} for endpoint {}.", queue.getName(), this.endpoint);
            return queue;
        }

        protected Binding declareBinding(Exchange exchange, Queue queue) {
            Binding noargs;
            if (!(exchange instanceof HeadersExchange)) {
                noargs = exchange instanceof FanoutExchange ? BindingBuilder.bind(queue).to((FanoutExchange) exchange) : BindingBuilder.bind(queue).to(exchange).with(this.endpoint.routingKey).noargs();
            } else {
                if (this.endpoint.routingKey == null) {
                    throw new IllegalStateException("Specified a header exchange without a key/value match");
                }
                if (this.endpoint.routingKey.contains("|") && this.endpoint.routingKey.contains("&")) {
                    throw new IllegalArgumentException("You cannot mix AND and OR expressions within a header binding");
                }
                Map<String, Object> parseKeyValues = SpringAMQPConsumer.parseKeyValues(this.endpoint.routingKey);
                BindingBuilder.HeadersExchangeMapConfigurer headersExchangeMapConfigurer = BindingBuilder.bind(queue).to((HeadersExchange) exchange);
                noargs = this.endpoint.routingKey.contains("|") ? headersExchangeMapConfigurer.whereAny(parseKeyValues).match() : headersExchangeMapConfigurer.whereAll(parseKeyValues).match();
            }
            if (this.endpoint.isUsingDefaultExchange()) {
                SpringAMQPConsumer.LOG.info("Using default exchange for endpoint {}.  Default exchange is implicitly bound to every queue, with a routing key equal to the queue name.", this.endpoint);
            } else if (noargs != null) {
                SpringAMQPConsumer.LOG.info("Declaring binding {} for endpoint {}.", noargs.getRoutingKey(), this.endpoint);
                this.endpoint.getAmqpAdministration().declareBinding(noargs);
            }
            return noargs;
        }
    }

    public SpringAMQPConsumer(SpringAMQPEndpoint springAMQPEndpoint, Processor processor) {
        super(springAMQPEndpoint, processor);
        this.endpoint = springAMQPEndpoint;
        this.messageListener = new RabbitMQMessageListener(springAMQPEndpoint);
    }

    public void doStart() throws Exception {
        super.doStart();
        if (this.messageListener.listenerContainer.isActive()) {
            return;
        }
        this.messageListener.start();
    }

    public void doShutdown() throws Exception {
        this.messageListener.shutdown();
        super.shutdown();
    }

    protected static Map<String, Object> parseKeyValues(String str) {
        StringTokenizer stringTokenizer = new StringTokenizer(str, "&|");
        HashMap hashMap = new HashMap();
        while (stringTokenizer.hasMoreTokens()) {
            String nextToken = stringTokenizer.nextToken();
            String[] split = nextToken.split("=");
            if (split.length != 2) {
                throw new IllegalArgumentException("Couldn't parse key/value pair [" + nextToken + "] out of string: " + str);
            }
            hashMap.put(split[0], split[1]);
        }
        return hashMap;
    }

    public void onCreate(Connection connection) {
        LOG.info("Network connection created to broker for endpoint {}", getEndpoint());
    }

    public void onClose(Connection connection) {
        LOG.info("Network connection closed to broker for endpoint {}", getEndpoint());
    }
}
