package me.kisoft.easybus.rabbitmq;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConsumerShutdownSignalCallback;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import me.kisoft.easybus.BackingBus;
import me.kisoft.easybus.Listener;
import me.kisoft.easybus.memory.MemoryBackingBusImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:me/kisoft/easybus/rabbitmq/RabbitMQBackingBusImpl.class */
public class RabbitMQBackingBusImpl extends BackingBus {
    protected static final Logger log = LoggerFactory.getLogger(RabbitMQBackingBusImpl.class);
    protected final Connection connection;
    protected final ObjectMapper mapper;
    protected final Map<Class, String> tagMap;
    protected final Map<Class, Channel> channelMap;
    protected final Set<String> exchangeList;
    protected final MemoryBackingBusImpl memoryBusImpl;
    protected final ReentrantLock declarationLock;
    protected final boolean allowUpdate;
    protected final int maxPrefetch;
    protected final boolean requeue;

    public RabbitMQBackingBusImpl(Connection connection, boolean z, int i, boolean z2) {
        this.mapper = new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS).disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
        this.tagMap = new HashMap();
        this.channelMap = new HashMap();
        this.exchangeList = new HashSet();
        this.memoryBusImpl = new MemoryBackingBusImpl();
        this.declarationLock = new ReentrantLock();
        this.connection = connection;
        this.allowUpdate = z;
        this.maxPrefetch = i;
        this.requeue = z2;
    }

    public RabbitMQBackingBusImpl(Connection connection, boolean z) {
        this(connection, z, 10, true);
    }

    public void post(Object obj) {
        try {
            Channel createChannel = this.connection.createChannel();
            try {
                String excahngeName = getExcahngeName(obj);
                verifyOrUpdateExchange(excahngeName, getExchangeType(obj));
                log.debug("Published Message to  Exchange {}", excahngeName);
                createChannel.basicPublish(getExcahngeName((Class) obj.getClass()), "all", (AMQP.BasicProperties) null, this.mapper.writer().writeValueAsBytes(obj));
                if (createChannel != null) {
                    createChannel.close();
                }
            } finally {
            }
        } catch (IOException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    protected void verifyOrUpdateExchange(String str, BuiltinExchangeType builtinExchangeType) throws IOException, TimeoutException {
        boolean z;
        if (this.exchangeList.contains(str)) {
            return;
        }
        this.declarationLock.lock();
        try {
            if (!this.exchangeList.contains(str)) {
                try {
                    Channel createChannel = this.connection.createChannel();
                    try {
                        createChannel.exchangeDeclarePassive(str);
                        log.debug("Exchange {} already exists", str);
                        z = true;
                        if (createChannel != null) {
                            createChannel.close();
                        }
                    } catch (Throwable th) {
                        if (createChannel != null) {
                            try {
                                createChannel.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (IOException e) {
                    z = false;
                }
                boolean z2 = false;
                if (!z) {
                    try {
                        Channel createChannel2 = this.connection.createChannel();
                        try {
                            createChannel2.exchangeDeclare(str, builtinExchangeType);
                            log.debug("Declared Exchange {}", str);
                            this.exchangeList.add(str);
                            z2 = false;
                            if (createChannel2 != null) {
                                createChannel2.close();
                            }
                        } catch (Throwable th3) {
                            if (createChannel2 != null) {
                                try {
                                    createChannel2.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            }
                            throw th3;
                        }
                    } catch (IOException e2) {
                        z2 = true;
                    }
                }
                if (z2 && this.allowUpdate) {
                    Channel createChannel3 = this.connection.createChannel();
                    try {
                        createChannel3.exchangeDelete(str, true);
                        createChannel3.exchangeDeclare(str, builtinExchangeType);
                        this.exchangeList.add(str);
                        if (createChannel3 != null) {
                            createChannel3.close();
                        }
                    } catch (Throwable th5) {
                        if (createChannel3 != null) {
                            try {
                                createChannel3.close();
                            } catch (Throwable th6) {
                                th5.addSuppressed(th6);
                            }
                        }
                        throw th5;
                    }
                }
            }
        } finally {
            this.declarationLock.unlock();
        }
    }

    public void clear() {
        clearTags();
        clearChannels();
    }

    public void close() throws IOException {
        Connection connection = this.connection;
        try {
            clear();
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected BuiltinExchangeType getExchangeType(Object obj) {
        return getExchangeType((Class) obj.getClass());
    }

    protected BuiltinExchangeType getExchangeType(Class cls) {
        ExchangeType exchangeType = (ExchangeType) cls.getAnnotation(ExchangeType.class);
        return (exchangeType == null || exchangeType.value() == null) ? BuiltinExchangeType.FANOUT : exchangeType.value();
    }

    protected String getQueueName(Object obj) {
        return getQueueName((Class) obj.getClass());
    }

    protected String getQueueName(Class cls) {
        QueueName queueName = (QueueName) cls.getAnnotation(QueueName.class);
        return queueName != null ? queueName.value() : cls.getSimpleName();
    }

    protected String getExcahngeName(Object obj) {
        return getExcahngeName((Class) obj.getClass());
    }

    protected String getExcahngeName(Class cls) {
        ExchangeName exchangeName = (ExchangeName) cls.getAnnotation(ExchangeName.class);
        return exchangeName != null ? exchangeName.value() : cls.getSimpleName();
    }

    protected Set<String> getRoutingKeys(Object obj) {
        return getRoutingKeys((Class) obj.getClass());
    }

    protected Set<String> getRoutingKeys(Class cls) {
        RoutingKey[] routingKeyArr = (RoutingKey[]) cls.getAnnotationsByType(RoutingKey.class);
        return (routingKeyArr == null || routingKeyArr.length == 0) ? Set.of("#") : (Set) Arrays.stream(routingKeyArr).map((v0) -> {
            return v0.value();
        }).distinct().collect(Collectors.toSet());
    }

    protected void clearChannels() {
        try {
            this.channelMap.values().forEach(channel -> {
                try {
                    channel.close();
                } catch (IOException | TimeoutException e) {
                    log.warn("Failed to close channel {} : {}", Integer.valueOf(channel.getChannelNumber()), e);
                }
            });
        } catch (Exception e) {
            log.error(e.getMessage());
        } finally {
            this.channelMap.clear();
        }
    }

    protected void clearTags() {
        try {
            try {
                Channel createChannel = this.connection.createChannel();
                try {
                    this.tagMap.values().forEach(str -> {
                        try {
                            createChannel.basicCancel(str);
                        } catch (IOException e) {
                            log.warn("Failed to close tag {} : {}", str, e);
                        }
                    });
                    if (createChannel != null) {
                        createChannel.close();
                    }
                    this.tagMap.clear();
                } catch (Throwable th) {
                    if (createChannel != null) {
                        try {
                            createChannel.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (IOException | TimeoutException e) {
                log.error(e.getMessage());
                this.tagMap.clear();
            }
        } catch (Throwable th3) {
            this.tagMap.clear();
            throw th3;
        }
    }

    protected void addHandler(Class cls, Listener listener) {
        String excahngeName = getExcahngeName(cls);
        BuiltinExchangeType exchangeType = getExchangeType(cls);
        String queueName = getQueueName(listener);
        Set<String> routingKeys = getRoutingKeys(listener);
        try {
            Channel createChannel = this.connection.createChannel();
            ObjectReader forType = this.mapper.reader().forType(cls);
            verifyOrUpdateExchange(excahngeName, exchangeType);
            String queue = createChannel.queueDeclare(queueName, false, false, false, (Map) null).getQueue();
            Iterator<String> it = routingKeys.iterator();
            while (it.hasNext()) {
                createChannel.queueBind(queue, excahngeName, it.next());
            }
            DeliverCallback deliverCallback = (str, delivery) -> {
                log.trace("Received Message from Exchange {} Queue {} with Delivery Tag {}", new Object[]{excahngeName, queueName, Long.valueOf(delivery.getEnvelope().getDeliveryTag())});
                try {
                    try {
                        try {
                            this.memoryBusImpl.post(forType.readValue(delivery.getBody()));
                            createChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                            log.trace("Finished Receiving Message from Exchange {} Queue {} with Delivery Tag {}", new Object[]{excahngeName, queueName, Long.valueOf(delivery.getEnvelope().getDeliveryTag())});
                        } catch (IOException e) {
                            log.error("Exception when processing message from rabbitMQ : {}", e);
                            log.trace("Finished Receiving Message from Exchange {} Queue {} with Delivery Tag {}", new Object[]{excahngeName, queueName, Long.valueOf(delivery.getEnvelope().getDeliveryTag())});
                        } catch (Throwable th) {
                            log.info("Failure when processing event of type {}, Listener {} : {}", new Object[]{cls, listener, th});
                            createChannel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, this.requeue);
                            log.trace("Finished Receiving Message from Exchange {} Queue {} with Delivery Tag {}", new Object[]{excahngeName, queueName, Long.valueOf(delivery.getEnvelope().getDeliveryTag())});
                        }
                    } catch (JsonProcessingException e2) {
                        log.warn("Error Decoding message from Exchange {}, class {}: {} ", new Object[]{excahngeName, cls, e2});
                        createChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    }
                } catch (Throwable th2) {
                    log.trace("Finished Receiving Message from Exchange {} Queue {} with Delivery Tag {}", new Object[]{excahngeName, queueName, Long.valueOf(delivery.getEnvelope().getDeliveryTag())});
                    throw th2;
                }
            };
            CancelCallback cancelCallback = str2 -> {
                this.tagMap.remove(cls);
                this.channelMap.remove(cls);
            };
            ConsumerShutdownSignalCallback consumerShutdownSignalCallback = (str3, shutdownSignalException) -> {
                if (shutdownSignalException.isHardError()) {
                    log.error("Channel for Queue(Event) Listener {} was closed abnormaly : {}", queueName, shutdownSignalException);
                } else {
                    log.warn("Channel for Queue(Event) Listener {} was closed normally : {}", queueName, shutdownSignalException);
                }
            };
            createChannel.basicQos(this.maxPrefetch);
            createChannel.setDefaultConsumer(new DefaultConsumer(createChannel));
            this.tagMap.put(cls, createChannel.basicConsume(queueName, false, deliverCallback, cancelCallback, consumerShutdownSignalCallback));
            this.channelMap.put(cls, createChannel);
            this.memoryBusImpl.addHandler(cls, listener);
        } catch (IOException | TimeoutException e) {
            log.error("Failed to add listener {} : {}", listener, e);
            throw new RuntimeException(e);
        }
    }
}
