package be.looorent.jflu.subscriber.rabbitmq;

import be.looorent.jflu.Configuration;
import be.looorent.jflu.Event;
import be.looorent.jflu.subscriber.ConsumptionException;
import be.looorent.jflu.subscriber.QueueListener;
import be.looorent.jflu.subscriber.SubscriptionRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:be/looorent/jflu/subscriber/rabbitmq/RabbitMQQueueListener.class */
class RabbitMQQueueListener implements QueueListener {
    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQQueueListener.class);
    private final RabbitMQSubscriptionConfiguration configuration;
    private final ObjectMapper jsonMapper;

    public RabbitMQQueueListener(RabbitMQSubscriptionConfiguration rabbitMQSubscriptionConfiguration) {
        if (rabbitMQSubscriptionConfiguration == null) {
            throw new IllegalArgumentException("configuration must not be null");
        }
        this.configuration = rabbitMQSubscriptionConfiguration;
        this.jsonMapper = Configuration.getInstance().getDefaultJsonMapper();
    }

    public void listen(final SubscriptionRepository subscriptionRepository) {
        if (subscriptionRepository == null) {
            throw new IllegalArgumentException("subscriptionRepository must not be null");
        }
        try {
            final Channel channel = this.configuration.getChannel();
            channel.basicConsume(this.configuration.getQueueName(), false, new DefaultConsumer(channel) { // from class: be.looorent.jflu.subscriber.rabbitmq.RabbitMQQueueListener.1
                public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                    super.handleDelivery(str, envelope, basicProperties, bArr);
                    Event event = (Event) RabbitMQQueueListener.this.jsonMapper.readValue(bArr, Event.class);
                    try {
                        subscriptionRepository.findAllSubscriptionsFor(event).forEach(subscription -> {
                            subscription.consume(event);
                        });
                        channel.basicAck(envelope.getDeliveryTag(), false);
                        RabbitMQQueueListener.LOG.debug("Event acked: {}", event.getId());
                    } catch (Exception e) {
                        throw new ConsumptionException(event, e);
                    }
                }
            });
        } catch (IOException e) {
            LOG.error("An error occurred when consuming events", e);
            throw new RuntimeException(e);
        }
    }
}
