package kieker.analysis.plugin.reader.amqp;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import kieker.analysis.plugin.reader.newio.IRawDataProcessor;
import kieker.analysis.plugin.reader.newio.IRawDataReader;
import kieker.analysis.plugin.reader.newio.Outcome;
import kieker.common.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kieker/analysis/plugin/reader/amqp/ChunkingAmqpReader.class */
public class ChunkingAmqpReader implements IRawDataReader {
    public static final String CONFIG_PROPERTY_URI = "uri";
    public static final String CONFIG_PROPERTY_QUEUENAME = "queueName";
    public static final String CONFIG_PROPERTY_HEARTBEAT = "heartbeat";
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ChunkingAmqpReader.class);
    private final String uri;
    private final String queueName;
    private final int heartbeat;
    private final IRawDataProcessor processor;
    private volatile Connection connection;
    private volatile Channel channel;
    private volatile QueueingConsumer consumer;
    private volatile boolean terminated;

    public ChunkingAmqpReader(Configuration configuration, IRawDataProcessor iRawDataProcessor) {
        this.uri = configuration.getStringProperty("uri");
        this.queueName = configuration.getStringProperty("queueName");
        this.heartbeat = configuration.getIntProperty("heartbeat");
        this.processor = iRawDataProcessor;
    }

    @Override // kieker.analysis.plugin.reader.newio.IRawDataReader
    public Outcome onInitialization() {
        try {
            this.connection = createConnection();
            this.channel = this.connection.createChannel();
            this.consumer = new QueueingConsumer(this.channel);
            return Outcome.SUCCESS;
        } catch (IOException | URISyntaxException | KeyManagementException | NoSuchAlgorithmException | TimeoutException e) {
            handleInitializationError(e);
            return Outcome.FAILURE;
        }
    }

    private void handleInitializationError(Throwable th) {
        LOGGER.error("An error occurred initializing the AMQP reader.", th);
    }

    private Connection createConnection() throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException, URISyntaxException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(this.uri);
        connectionFactory.setRequestedHeartbeat(this.heartbeat);
        return connectionFactory.newConnection();
    }

    @Override // kieker.analysis.plugin.reader.newio.IRawDataReader
    public Outcome read() {
        IRawDataProcessor iRawDataProcessor = this.processor;
        try {
            this.channel.basicConsume(this.queueName, true, (Consumer) this.consumer);
            while (!this.terminated) {
                iRawDataProcessor.decodeAndDeliverRecords(this.consumer.nextDelivery().getBody());
            }
            return Outcome.SUCCESS;
        } catch (IOException e) {
            LOGGER.error("Error while reading from queue {}", this.queueName, e);
            return Outcome.FAILURE;
        } catch (InterruptedException e2) {
            LOGGER.error("Consumer was interrupted on queue {}", this.queueName, e2);
            return Outcome.FAILURE;
        }
    }

    @Override // kieker.analysis.plugin.reader.newio.IRawDataReader
    public Outcome onTermination() {
        try {
            this.terminated = true;
            this.connection.close();
            return Outcome.SUCCESS;
        } catch (IOException e) {
            LOGGER.error("IO error while trying to close the connection.", (Throwable) e);
            return Outcome.FAILURE;
        }
    }
}
