package kieker.analysis.plugin.reader.amqp;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.annotation.OutputPort;
import kieker.analysis.plugin.annotation.Plugin;
import kieker.analysis.plugin.annotation.Property;
import kieker.analysis.plugin.reader.AbstractStringRegistryReaderPlugin;
import kieker.common.configuration.Configuration;
import kieker.common.record.IMonitoringRecord;
import kieker.common.util.filesystem.FSUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(description = "A plugin that reads monitoring records from an AMQP queue", outputPorts = {@OutputPort(name = "monitoringRecords", eventTypes = {IMonitoringRecord.class}, description = "Output port of the AMQP reader")}, configuration = {@Property(name = "uri", defaultValue = "amqp://localhost", description = "Server URI of the AMQP server"), @Property(name = "queueName", defaultValue = FSUtil.FILE_PREFIX, description = "AMQP queue name"), @Property(name = "heartbeat", defaultValue = "60", description = "Heartbeat interval (in seconds)"), @Property(name = AmqpReader.CONFIG_PROPERTY_CACHE_DURATION, defaultValue = "60", description = "Cache duration (in seconds) for string registries")})
/* loaded from: input_file:kieker/analysis/plugin/reader/amqp/AmqpReader.class */
public final class AmqpReader extends AbstractStringRegistryReaderPlugin {
    public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords";
    public static final String CONFIG_PROPERTY_URI = "uri";
    public static final String CONFIG_PROPERTY_QUEUENAME = "queueName";
    public static final String CONFIG_PROPERTY_HEARTBEAT = "heartbeat";
    public static final String CONFIG_PROPERTY_CACHE_DURATION = "cacheDuration";
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpReader.class.getCanonicalName());
    private static final byte REGISTRY_RECORD_ID = -1;
    private static final byte REGULAR_RECORD_ID = 1;
    private final String uri;
    private final String queueName;
    private final int heartbeat;
    private final long cacheDuration;
    private volatile Connection connection;
    private volatile Channel channel;
    private volatile QueueingConsumer consumer;
    private volatile boolean terminated;

    public AmqpReader(Configuration configuration, IProjectContext iProjectContext) {
        super(configuration, iProjectContext, CONFIG_PROPERTY_CACHE_DURATION, TimeUnit.SECONDS);
        this.uri = this.configuration.getStringProperty("uri");
        this.queueName = this.configuration.getStringProperty("queueName");
        this.heartbeat = this.configuration.getIntProperty("heartbeat");
        this.cacheDuration = this.configuration.getLongProperty(CONFIG_PROPERTY_CACHE_DURATION);
    }

    @Override // kieker.analysis.plugin.reader.AbstractStringRegistryReaderPlugin, kieker.analysis.plugin.reader.AbstractReaderPlugin, kieker.analysis.plugin.IPlugin
    public boolean init() {
        if (!super.init()) {
            return false;
        }
        try {
            this.connection = createConnection();
            this.channel = this.connection.createChannel();
            this.consumer = new QueueingConsumer(this.channel);
            return true;
        } catch (IOException e) {
            handleInitializationError(e);
            return false;
        } catch (URISyntaxException e2) {
            handleInitializationError(e2);
            return false;
        } catch (KeyManagementException e3) {
            handleInitializationError(e3);
            return false;
        } catch (NoSuchAlgorithmException e4) {
            handleInitializationError(e4);
            return false;
        } catch (TimeoutException e5) {
            handleInitializationError(e5);
            return false;
        }
    }

    private void handleInitializationError(Throwable th) {
        LOGGER.error("An error occurred initializing the AMQP reader:", th);
    }

    private Connection createConnection() throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException, URISyntaxException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(this.uri);
        connectionFactory.setRequestedHeartbeat(this.heartbeat);
        return connectionFactory.newConnection();
    }

    @Override // kieker.analysis.analysisComponent.AbstractAnalysisComponent, kieker.analysis.analysisComponent.IAnalysisComponent
    public Configuration getCurrentConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setProperty("uri", this.uri);
        configuration.setProperty("queueName", this.queueName);
        configuration.setProperty("heartbeat", Integer.toString(this.heartbeat));
        configuration.setProperty(CONFIG_PROPERTY_CACHE_DURATION, Long.toString(this.cacheDuration));
        return configuration;
    }

    @Override // kieker.analysis.plugin.reader.IReaderPlugin
    public boolean read() {
        ensureThreadsStarted();
        try {
            this.channel.basicConsume(this.queueName, true, this.consumer);
            while (!this.terminated) {
                ByteBuffer wrap = ByteBuffer.wrap(this.consumer.nextDelivery().getBody());
                byte b = wrap.get();
                switch (b) {
                    case -1:
                        handleRegistryRecord(wrap);
                        break;
                    case 1:
                        handleRegularRecord(wrap);
                        break;
                    default:
                        this.logger.error(String.format("Unknown record type: %02x", Byte.valueOf(b)));
                        break;
                }
            }
            return true;
        } catch (IOException e) {
            this.logger.error("Error while reading from queue {}", this.queueName, e);
            return false;
        } catch (InterruptedException e2) {
            this.logger.error("Consumer was interrupted on queue {}", this.queueName, e2);
            return false;
        }
    }

    @Override // kieker.analysis.plugin.IPlugin
    public void terminate(boolean z) {
        try {
            this.terminated = true;
            this.connection.close();
        } catch (IOException e) {
            this.logger.error("IO error while trying to close the connection.", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // kieker.analysis.plugin.reader.AbstractStringRegistryReaderPlugin
    public void deliverRecord(IMonitoringRecord iMonitoringRecord) {
        deliver("monitoringRecords", iMonitoringRecord);
    }
}
