package kieker.monitoring.writer.amqp;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import kieker.common.configuration.Configuration;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.io.BinaryValueSerializer;
import kieker.common.registry.IRegistryListener;
import kieker.common.registry.writer.WriterRegistry;
import kieker.common.util.thread.DaemonThreadFactory;
import kieker.monitoring.writer.AbstractMonitoringWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kieker/monitoring/writer/amqp/AmqpWriter.class */
public class AmqpWriter extends AbstractMonitoringWriter implements IRegistryListener<String> {
    public static final byte REGISTRY_RECORD_ID = -1;
    public static final byte REGULAR_RECORD_ID = 1;
    private static final int DEFAULT_BUFFER_SIZE = 16384;
    private static final int SIZE_OF_ENVELOPE = 9;
    private static final int DEFAULT_HEARTBEAT = 60;
    private final String uri;
    private final String exchangeName;
    private final String queueName;
    private final int heartbeat;
    private final ByteBuffer buffer;
    private final Connection connection;
    private final Channel channel;
    private final WriterRegistry writerRegistry;
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpWriter.class);
    private static final String PREFIX = AmqpWriter.class.getName() + ".";
    public static final String CONFIG_URI = PREFIX + "uri";
    public static final String CONFIG_EXCHANGENAME = PREFIX + "exchangename";
    public static final String CONFIG_QUEUENAME = PREFIX + "queuename";
    public static final String CONFIG_HEARTBEAT = PREFIX + "heartbeat";

    public AmqpWriter(Configuration configuration) throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
        super(configuration);
        this.uri = configuration.getStringProperty(CONFIG_URI);
        this.exchangeName = configuration.getStringProperty(CONFIG_EXCHANGENAME);
        this.queueName = configuration.getStringProperty(CONFIG_QUEUENAME);
        int intProperty = configuration.getIntProperty(CONFIG_HEARTBEAT);
        if (intProperty == 0) {
            this.heartbeat = 60;
        } else {
            this.heartbeat = intProperty;
        }
        this.buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
        this.connection = createConnection();
        this.channel = this.connection.createChannel();
        this.writerRegistry = new WriterRegistry(this);
    }

    private Connection createConnection() throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(this.uri);
        connectionFactory.setRequestedHeartbeat(this.heartbeat);
        connectionFactory.setThreadFactory(new DaemonThreadFactory());
        return connectionFactory.newConnection();
    }

    @Override // kieker.monitoring.writer.AbstractMonitoringWriter
    public void onStarting() {
    }

    @Override // kieker.monitoring.writer.AbstractMonitoringWriter
    public void writeMonitoringRecord(IMonitoringRecord iMonitoringRecord) {
        ByteBuffer byteBuffer = this.buffer;
        if (byteBuffer.capacity() < 21 + iMonitoringRecord.getSize()) {
            throw new IllegalStateException("Insufficient buffer capacity for string registry data");
        }
        String name = iMonitoringRecord.getClass().getName();
        this.writerRegistry.register(name);
        byteBuffer.put((byte) 1);
        byteBuffer.putLong(this.writerRegistry.getId());
        byteBuffer.putInt(this.writerRegistry.getId(name));
        byteBuffer.putLong(iMonitoringRecord.getLoggingTimestamp());
        iMonitoringRecord.serialize(BinaryValueSerializer.create(byteBuffer, this.writerRegistry));
        publishBuffer(byteBuffer);
    }

    @Override // kieker.common.registry.IRegistryListener
    public void onNewRegistryEntry(String str, int i) {
        ByteBuffer byteBuffer = this.buffer;
        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
        if (byteBuffer.capacity() < 17 + bytes.length) {
            throw new IllegalStateException("Insufficient buffer capacity for string registry data");
        }
        byteBuffer.put((byte) -1);
        byteBuffer.putLong(this.writerRegistry.getId());
        byteBuffer.putInt(i);
        byteBuffer.putInt(bytes.length);
        byteBuffer.put(bytes);
        publishBuffer(byteBuffer);
    }

    private void publishBuffer(ByteBuffer byteBuffer) {
        int position = byteBuffer.position();
        byte[] bArr = new byte[position];
        System.arraycopy(byteBuffer.array(), byteBuffer.arrayOffset(), bArr, 0, position);
        byteBuffer.position(0);
        try {
            this.channel.basicPublish(this.exchangeName, this.queueName, (AMQP.BasicProperties) null, bArr);
        } catch (IOException e) {
            LOGGER.error("An exception occurred", e);
        }
    }

    @Override // kieker.monitoring.writer.AbstractMonitoringWriter
    public void onTerminating() {
        try {
            this.connection.close();
        } catch (IOException e) {
            LOGGER.error("Error closing connection", e);
        }
    }
}
