package com.github.combinedmq.rabbitmq;

import com.github.combinedmq.configuration.Configuration;
import com.github.combinedmq.connection.ConnectionFactory;
import com.github.combinedmq.consumer.AbstractConsumer;
import com.github.combinedmq.exception.MqException;
import com.github.combinedmq.message.MessageExecutor;
import com.github.combinedmq.message.MessageListener;
import com.github.combinedmq.message.Queue;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/combinedmq/rabbitmq/RabbitMqConsumer.class */
public class RabbitMqConsumer extends AbstractConsumer {
    private static final Logger log = LoggerFactory.getLogger(RabbitMqConsumer.class);
    private final String exchangeType = "topic";
    private final boolean autoAck = false;
    private Connection connection;

    public RabbitMqConsumer(ConnectionFactory connectionFactory) {
        super(connectionFactory);
        this.exchangeType = "topic";
        this.autoAck = false;
    }

    @Override // com.github.combinedmq.consumer.Consumer
    public void listen() throws MqException {
        try {
            if (null == this.connection) {
                this.connection = (Connection) getConnectionFactory().getConnection().getTargetConnection();
            }
            for (Map.Entry<Queue, List<MessageListener>> entry : getQueueListener().entrySet()) {
                RabbitMqQueue rabbitMqQueue = (RabbitMqQueue) entry.getKey();
                List<MessageListener> value = entry.getValue();
                final MessageExecutor messageExecutor = new MessageExecutor("rabbitmq", value.size());
                for (MessageListener messageListener : value) {
                    if (null == messageExecutor.getMessageListener()) {
                        messageExecutor.setMessageListener(messageListener);
                    }
                    Channel createChannel = this.connection.createChannel();
                    Configuration configuration = getConfiguration();
                    Integer prefetchCount = configuration instanceof RabbitMqConfiguration ? ((RabbitMqConfiguration) configuration).getConsumerListener().getPrefetchCount() : null;
                    createChannel.basicQos(prefetchCount != null ? prefetchCount.intValue() : 1);
                    createChannel.exchangeDeclare(rabbitMqQueue.getExchangeName(), "topic", true);
                    createChannel.queueDeclare(rabbitMqQueue.getQueueName(), true, false, false, (Map) null);
                    createChannel.queueBind(rabbitMqQueue.getQueueName(), rabbitMqQueue.getExchangeName(), rabbitMqQueue.getQueueBindingKey(), (Map) null);
                    createChannel.basicConsume(rabbitMqQueue.getQueueName(), false, new DefaultConsumer(createChannel) { // from class: com.github.combinedmq.rabbitmq.RabbitMqConsumer.1
                        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                            try {
                                try {
                                    messageExecutor.onMessage(new RabbitMqMessage(bArr)).get();
                                    getChannel().basicAck(envelope.getDeliveryTag(), false);
                                } catch (Throwable th) {
                                    th.printStackTrace();
                                    getChannel().basicAck(envelope.getDeliveryTag(), false);
                                }
                            } catch (Throwable th2) {
                                getChannel().basicAck(envelope.getDeliveryTag(), false);
                                throw th2;
                            }
                        }
                    });
                }
            }
        } catch (Exception e) {
            throw new MqException(e);
        }
    }
}
