package kieker.analysis.tt.reader.amqp;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import kieker.common.record.IMonitoringRecord;
import kieker.common.registry.reader.ReaderRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
/* loaded from: input_file:kieker/analysis/tt/reader/amqp/AMQPReader.class */
public final class AMQPReader {
    private static final Logger LOGGER = LoggerFactory.getLogger(AMQPReader.class);
    private static final byte REGISTRY_RECORD_ID = -1;
    private static final byte REGULAR_RECORD_ID = 1;
    private final String uri;
    private final String queueName;
    private final int heartbeat;
    private volatile Connection connection;
    private volatile Channel channel;
    private volatile QueueingConsumer consumer;
    private final ReaderRegistry<String> stringRegistry = new ReaderRegistry<>();
    private volatile Thread registryRecordHandlerThread;
    private volatile RegistryRecordHandler registryRecordHandler;
    private volatile RegularRecordHandler regularRecordHandler;
    private volatile Thread regularRecordHandlerThread;
    private volatile boolean terminated;
    private volatile boolean threadsStarted;
    private final Consumer<IMonitoringRecord> elementReceivedCallback;

    public AMQPReader(String str, String str2, int i, Consumer<IMonitoringRecord> consumer) {
        this.uri = str;
        this.queueName = str2;
        this.heartbeat = i;
        this.elementReceivedCallback = consumer;
        init();
    }

    public void init() {
        try {
            this.connection = createConnection();
            this.channel = this.connection.createChannel();
            this.consumer = new QueueingConsumer(this.channel);
            this.registryRecordHandler = new RegistryRecordHandler(this.stringRegistry);
            this.regularRecordHandler = new RegularRecordHandler(this, this.stringRegistry);
            this.registryRecordHandlerThread = new Thread(this.registryRecordHandler);
            this.registryRecordHandlerThread.setDaemon(true);
            this.regularRecordHandlerThread = new Thread(this.regularRecordHandler);
            this.regularRecordHandlerThread.setDaemon(true);
        } catch (IOException e) {
            handleInitializationError(e);
        } catch (URISyntaxException e2) {
            handleInitializationError(e2);
        } catch (KeyManagementException e3) {
            handleInitializationError(e3);
        } catch (NoSuchAlgorithmException e4) {
            handleInitializationError(e4);
        } catch (TimeoutException e5) {
            handleInitializationError(e5);
        }
    }

    private void handleInitializationError(Throwable th) {
        LOGGER.error("An error occurred initializing the AMQP reader: {}", th);
    }

    private Connection createConnection() throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException, URISyntaxException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(this.uri);
        connectionFactory.setRequestedHeartbeat(this.heartbeat);
        return connectionFactory.newConnection();
    }

    public boolean read() {
        if (!this.threadsStarted) {
            this.registryRecordHandlerThread.start();
            this.regularRecordHandlerThread.start();
            this.threadsStarted = true;
        }
        try {
            this.channel.basicConsume(this.queueName, true, this.consumer);
            while (!this.terminated) {
                ByteBuffer wrap = ByteBuffer.wrap(this.consumer.nextDelivery().getBody());
                byte b = wrap.get();
                switch (b) {
                    case -1:
                        this.registryRecordHandler.enqueueRegistryRecord(wrap);
                        break;
                    case 1:
                        this.regularRecordHandler.enqueueRegularRecord(wrap);
                        break;
                    default:
                        if (!LOGGER.isErrorEnabled()) {
                            break;
                        } else {
                            LOGGER.error(String.format("Unknown record type: %02x", Byte.valueOf(b)));
                            break;
                        }
                }
            }
            return true;
        } catch (IOException e) {
            LOGGER.error("Error while reading from queue {}", this.queueName, e);
            return false;
        } catch (InterruptedException e2) {
            LOGGER.error("Consumer was interrupted on queue {}", this.queueName, e2);
            return false;
        } catch (ShutdownSignalException e3) {
            LOGGER.info("Consumer was shut down while waiting on queue {}", this.queueName, e3);
            return true;
        }
    }

    public void terminate() {
        try {
            this.terminated = true;
            this.connection.close();
        } catch (IOException e) {
            LOGGER.error("IO error while trying to close the connection.", e);
        }
    }

    public void deliverRecord(IMonitoringRecord iMonitoringRecord) {
        this.elementReceivedCallback.accept(iMonitoringRecord);
    }
}
