package be.looorent.jflu.subscriber.rabbitmq;

import be.looorent.jflu.Configuration;
import be.looorent.jflu.Event;
import be.looorent.jflu.subscriber.ConsumptionException;
import be.looorent.jflu.subscriber.QueueListener;
import be.looorent.jflu.subscriber.SubscriptionRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:be/looorent/jflu/subscriber/rabbitmq/RabbitMQQueueListener.class */
class RabbitMQQueueListener implements QueueListener {
    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQQueueListener.class);
    private final RabbitMQSubscriptionConfiguration configuration;
    private final ObjectMapper jsonMapper;
    private Optional<String> consumerTag;
    private boolean listening;

    public RabbitMQQueueListener(RabbitMQSubscriptionConfiguration rabbitMQSubscriptionConfiguration) {
        if (rabbitMQSubscriptionConfiguration == null) {
            throw new IllegalArgumentException("configuration must not be null");
        }
        this.configuration = rabbitMQSubscriptionConfiguration;
        this.jsonMapper = Configuration.getInstance().getDefaultJsonMapper();
        this.consumerTag = Optional.empty();
    }

    public void listen(final SubscriptionRepository subscriptionRepository) {
        if (subscriptionRepository == null) {
            throw new IllegalArgumentException("subscriptionRepository must not be null");
        }
        try {
            final Channel channel = this.configuration.getChannel();
            this.consumerTag = Optional.of(channel.basicConsume(this.configuration.getQueueName(), false, new DefaultConsumer(channel) { // from class: be.looorent.jflu.subscriber.rabbitmq.RabbitMQQueueListener.1
                public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                    super.handleDelivery(str, envelope, basicProperties, bArr);
                    Event event = (Event) RabbitMQQueueListener.this.jsonMapper.readValue(bArr, Event.class);
                    try {
                        subscriptionRepository.findAllSubscriptionsFor(event).forEach(subscription -> {
                            subscription.consume(event);
                        });
                        channel.basicAck(envelope.getDeliveryTag(), false);
                        RabbitMQQueueListener.LOG.debug("Event acked: {}", event.getId());
                    } catch (Exception e) {
                        throw new ConsumptionException(event, e);
                    }
                }
            }));
            this.listening = true;
            LOG.debug("Consumer Tag is {}", this.consumerTag.orElse(null));
        } catch (IOException e) {
            LOG.error("An error occurred when consuming events", e);
            throw new RuntimeException(e);
        }
    }

    public void stop() {
        if (!this.listening) {
            throw new IllegalArgumentException("A queue cannot be stopped when it is not listening.");
        }
        try {
            if (this.consumerTag.isPresent()) {
                String str = this.consumerTag.get();
                LOG.debug("Stopping consumer with tag {}", str);
                this.configuration.getChannel().basicCancel(str);
            } else {
                LOG.warn("Trying to stop a consumer, but it has not received any tag (so it probably never started, or stop() has been called twice)");
            }
            this.listening = false;
        } catch (IOException e) {
            LOG.warn("Error when stopping consumer", e);
        }
    }

    public void deleteQueue() {
        String queueName = this.configuration.getQueueName();
        try {
            LOG.info("Deleting queue: {}...", queueName);
            this.configuration.getChannel().queueDelete(queueName);
            LOG.info("Deleting queue: {} : Done", queueName);
        } catch (IOException e) {
            LOG.warn("Error when deleting queue: {}", queueName, e);
        }
    }

    public boolean isListening() {
        return false;
    }
}
