package me.kisoft.easybus.rabbitmq;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import lombok.NonNull;
import me.kisoft.easybus.BackingBus;
import me.kisoft.easybus.Listener;
import me.kisoft.easybus.memory.MemoryBackingBusImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:me/kisoft/easybus/rabbitmq/RabbitMQBackingBusImpl.class */
public class RabbitMQBackingBusImpl extends BackingBus {
    protected static final Logger log = LoggerFactory.getLogger(RabbitMQBackingBusImpl.class);

    @NonNull
    private final Connection connection;
    private final ObjectMapper mapper;
    private final Set<String> exchangeSet = new HashSet();
    private final ReentrantLock declarationLock = new ReentrantLock();
    private final ScheduledExecutorService rebindingExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
        return new Thread(runnable, "rabbitmq-binding-pool");
    });
    private final MemoryBackingBusImpl memoryBusImpl;
    private final boolean allowUpdate;
    private final int maxPrefetch;
    private final boolean requeue;
    private final int retries;
    private final int retryThresholdMillis;

    /* loaded from: input_file:me/kisoft/easybus/rabbitmq/RabbitMQBackingBusImpl$NamedIngestorThreadFactory.class */
    protected static class NamedIngestorThreadFactory implements ThreadFactory {
        private static AtomicInteger poolNumber = new AtomicInteger(1);
        private final String namePrefix;
        private AtomicInteger threadNumber = new AtomicInteger(1);
        private final int pool = poolNumber.getAndIncrement();

        public NamedIngestorThreadFactory(String str) {
            this.namePrefix = str;
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, String.format("%s pool-%s ingestor-%s", this.namePrefix, Integer.valueOf(this.pool), Integer.valueOf(this.threadNumber.getAndIncrement())));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:me/kisoft/easybus/rabbitmq/RabbitMQBackingBusImpl$RabbitMQBackingBusConsumer.class */
    public class RabbitMQBackingBusConsumer extends DefaultConsumer {
        private final ExecutorService executor;
        private final Class eventClass;
        private final Listener eventListener;
        private final String exchangeName;
        private final String queueName;
        private final ObjectReader reader;
        private boolean shutdown;

        public RabbitMQBackingBusConsumer(Channel channel, Class cls, Listener listener) {
            super(channel);
            this.shutdown = false;
            this.eventClass = cls;
            this.eventListener = listener;
            this.exchangeName = RabbitMQBackingBusImpl.this.getExcahngeName(cls);
            this.queueName = RabbitMQBackingBusImpl.this.getQueueName(listener);
            this.reader = RabbitMQBackingBusImpl.this.mapper.reader().forType(cls);
            this.executor = Executors.newFixedThreadPool(RabbitMQBackingBusImpl.this.maxPrefetch, new NamedIngestorThreadFactory(String.format("queue-%s", this.queueName)));
            RabbitMQBackingBusImpl.this.memoryBusImpl.addListener(cls, listener);
            this.shutdown = false;
        }

        public void handleConsumeOk(String str) {
            RabbitMQBackingBusImpl.log.info("Added Consumer {} for Queue {} exchange {}", new Object[]{str, this.queueName, this.exchangeName});
            this.shutdown = false;
        }

        public void handleCancel(String str) throws IOException {
            this.shutdown = true;
            RabbitMQBackingBusImpl.log.warn("Force Cancelling Consumer for Listener {} , event {}", this.queueName, this.exchangeName);
            this.executor.shutdownNow();
            try {
                this.executor.awaitTermination(RabbitMQBackingBusImpl.this.retryThresholdMillis * RabbitMQBackingBusImpl.this.retries, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                RabbitMQBackingBusImpl.log.warn("Exception while awaiting excutor termination : {} ", e.getMessage());
            }
            RabbitMQBackingBusImpl.log.warn("Force Cancelled Consumer for Listener {} , event {}", this.queueName, this.exchangeName);
        }

        public void handleCancelOk(String str) {
            this.shutdown = true;
            RabbitMQBackingBusImpl.log.warn("Cancelling Consumer for Listener {} , event {}", this.queueName, this.exchangeName);
            if (getChannel().isOpen()) {
                try {
                    getChannel().close();
                } catch (IOException | TimeoutException e) {
                    RabbitMQBackingBusImpl.log.error("Exception while attempting to close consumer : {}", e.getMessage());
                }
            }
            RabbitMQBackingBusImpl.log.warn("Cancelled Consumer for Listener {} , event {}", this.queueName, this.exchangeName);
        }

        public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
            this.shutdown = true;
            RabbitMQBackingBusImpl.log.info("Consumer for Queue(Event) Listener {} was shutdown : {}", this.queueName, shutdownSignalException.getMessage());
            this.executor.shutdownNow();
            if (shutdownSignalException.isHardError()) {
                RabbitMQBackingBusImpl.log.warn("Consumer for Queue(Event) Listener {} was closed abnormaly : {}", this.queueName, shutdownSignalException.getReason());
            } else {
                RabbitMQBackingBusImpl.log.warn("Consumer for Queue(Event) Listener {} was closed normally : {}", this.queueName, shutdownSignalException.getReason());
            }
            try {
                this.executor.awaitTermination(RabbitMQBackingBusImpl.this.retryThresholdMillis * RabbitMQBackingBusImpl.this.retries, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                RabbitMQBackingBusImpl.log.warn("Exception while awaiting excutor termination : {} ", e.getMessage());
            }
            if (shutdownSignalException.isInitiatedByApplication()) {
                RabbitMQBackingBusImpl.log.warn("Consumer for Queue(Event) Listener {} was shutdown permanently by applicaiton : {}", this.queueName, shutdownSignalException.getMessage());
            } else {
                RabbitMQBackingBusImpl.this.rebindingExecutor.schedule(() -> {
                    RabbitMQBackingBusImpl.log.warn("Attempting to rebind Consumer for Queue(Event) Listener {}", this.queueName);
                    RabbitMQBackingBusImpl.this.doAddListener(this.eventClass, this.eventListener, 1, RabbitMQBackingBusImpl.this.retries);
                }, RabbitMQBackingBusImpl.this.retryThresholdMillis, TimeUnit.MILLISECONDS);
            }
        }

        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            long deliveryTag = envelope.getDeliveryTag();
            try {
                this.executor.submit(() -> {
                    Object obj;
                    boolean z = false;
                    RabbitMQBackingBusImpl.log.trace("Received Message from Exchange {} Queue {} with Delivery Tag {}", new Object[]{this.exchangeName, this.queueName, Long.valueOf(deliveryTag)});
                    try {
                        obj = this.reader.readValue(bArr);
                    } catch (Throwable th) {
                        RabbitMQBackingBusImpl.log.warn("Error Decoding message from Exchange {}, class {} : {} ", new Object[]{this.exchangeName, this.eventClass, th.getMessage()});
                        z = true;
                        obj = null;
                    }
                    if (obj != null) {
                        try {
                            try {
                                RabbitMQBackingBusImpl.this.memoryBusImpl.post(obj);
                                z = true;
                                RabbitMQBackingBusImpl.log.trace("Finished Receiving Message from Exchange {} Queue {} with Delivery Tag {}", new Object[]{this.exchangeName, this.queueName, Long.valueOf(deliveryTag)});
                            } catch (Throwable th2) {
                                RabbitMQBackingBusImpl.log.warn("Failure when processing event of type {}, Listener {} : {}", new Object[]{this.eventClass, this.eventListener, th2.getMessage()});
                                z = false;
                                RabbitMQBackingBusImpl.log.trace("Finished Receiving Message from Exchange {} Queue {} with Delivery Tag {}", new Object[]{this.exchangeName, this.queueName, Long.valueOf(deliveryTag)});
                            }
                        } catch (Throwable th3) {
                            RabbitMQBackingBusImpl.log.trace("Finished Receiving Message from Exchange {} Queue {} with Delivery Tag {}", new Object[]{this.exchangeName, this.queueName, Long.valueOf(deliveryTag)});
                            throw th3;
                        }
                    }
                    if (this.shutdown) {
                        RabbitMQBackingBusImpl.log.info("Consumer is already shutdown; skipping ackgnowledgement");
                        return;
                    }
                    try {
                        if (z) {
                            getChannel().basicAck(deliveryTag, false);
                        } else {
                            getChannel().basicNack(deliveryTag, false, RabbitMQBackingBusImpl.this.requeue);
                        }
                    } catch (IOException e) {
                        RabbitMQBackingBusImpl.log.error("RabbitMQ Exception when processing Message from Exchange {} Queue {} with Delivery Tag {} : {}", new Object[]{this.exchangeName, this.queueName, Long.valueOf(deliveryTag), e.getMessage()});
                    } catch (Throwable th4) {
                        RabbitMQBackingBusImpl.log.error("Exception when processing Message from Exchange {} Queue {} with Delivery Tag {} : {}", new Object[]{this.exchangeName, this.queueName, Long.valueOf(deliveryTag), th4.getMessage()});
                    }
                });
            } catch (Exception e) {
                RabbitMQBackingBusImpl.log.warn("Could not schedule message for processing : {}", e.getMessage());
                getChannel().basicNack(deliveryTag, false, true);
            }
        }
    }

    /* loaded from: input_file:me/kisoft/easybus/rabbitmq/RabbitMQBackingBusImpl$RabbitMQBackingBusImplBuilder.class */
    public static class RabbitMQBackingBusImplBuilder {
        private Connection connection;
        private boolean mapper$set;
        private ObjectMapper mapper$value;
        private boolean memoryBusImpl$set;
        private MemoryBackingBusImpl memoryBusImpl$value;
        private boolean allowUpdate$set;
        private boolean allowUpdate$value;
        private boolean maxPrefetch$set;
        private int maxPrefetch$value;
        private boolean requeue$set;
        private boolean requeue$value;
        private boolean retries$set;
        private int retries$value;
        private boolean retryThresholdMillis$set;
        private int retryThresholdMillis$value;

        RabbitMQBackingBusImplBuilder() {
        }

        public RabbitMQBackingBusImplBuilder connection(@NonNull Connection connection) {
            if (connection == null) {
                throw new NullPointerException("connection is marked non-null but is null");
            }
            this.connection = connection;
            return this;
        }

        public RabbitMQBackingBusImplBuilder mapper(ObjectMapper objectMapper) {
            this.mapper$value = objectMapper;
            this.mapper$set = true;
            return this;
        }

        public RabbitMQBackingBusImplBuilder memoryBusImpl(MemoryBackingBusImpl memoryBackingBusImpl) {
            this.memoryBusImpl$value = memoryBackingBusImpl;
            this.memoryBusImpl$set = true;
            return this;
        }

        public RabbitMQBackingBusImplBuilder allowUpdate(boolean z) {
            this.allowUpdate$value = z;
            this.allowUpdate$set = true;
            return this;
        }

        public RabbitMQBackingBusImplBuilder maxPrefetch(int i) {
            this.maxPrefetch$value = i;
            this.maxPrefetch$set = true;
            return this;
        }

        public RabbitMQBackingBusImplBuilder requeue(boolean z) {
            this.requeue$value = z;
            this.requeue$set = true;
            return this;
        }

        public RabbitMQBackingBusImplBuilder retries(int i) {
            this.retries$value = i;
            this.retries$set = true;
            return this;
        }

        public RabbitMQBackingBusImplBuilder retryThresholdMillis(int i) {
            this.retryThresholdMillis$value = i;
            this.retryThresholdMillis$set = true;
            return this;
        }

        public RabbitMQBackingBusImpl build() {
            ObjectMapper objectMapper = this.mapper$value;
            if (!this.mapper$set) {
                objectMapper = RabbitMQBackingBusImpl.$default$mapper();
            }
            MemoryBackingBusImpl memoryBackingBusImpl = this.memoryBusImpl$value;
            if (!this.memoryBusImpl$set) {
                memoryBackingBusImpl = RabbitMQBackingBusImpl.$default$memoryBusImpl();
            }
            boolean z = this.allowUpdate$value;
            if (!this.allowUpdate$set) {
                z = RabbitMQBackingBusImpl.$default$allowUpdate();
            }
            int i = this.maxPrefetch$value;
            if (!this.maxPrefetch$set) {
                i = RabbitMQBackingBusImpl.$default$maxPrefetch();
            }
            boolean z2 = this.requeue$value;
            if (!this.requeue$set) {
                z2 = RabbitMQBackingBusImpl.$default$requeue();
            }
            int i2 = this.retries$value;
            if (!this.retries$set) {
                i2 = RabbitMQBackingBusImpl.$default$retries();
            }
            int i3 = this.retryThresholdMillis$value;
            if (!this.retryThresholdMillis$set) {
                i3 = RabbitMQBackingBusImpl.$default$retryThresholdMillis();
            }
            return new RabbitMQBackingBusImpl(this.connection, objectMapper, memoryBackingBusImpl, z, i, z2, i2, i3);
        }

        public String toString() {
            return "RabbitMQBackingBusImpl.RabbitMQBackingBusImplBuilder(connection=" + this.connection + ", mapper$value=" + this.mapper$value + ", memoryBusImpl$value=" + this.memoryBusImpl$value + ", allowUpdate$value=" + this.allowUpdate$value + ", maxPrefetch$value=" + this.maxPrefetch$value + ", requeue$value=" + this.requeue$value + ", retries$value=" + this.retries$value + ", retryThresholdMillis$value=" + this.retryThresholdMillis$value + ")";
        }
    }

    public void post(Object obj) {
        try {
            Channel createChannel = this.connection.createChannel();
            try {
                String excahngeName = getExcahngeName(obj);
                verifyOrUpdateExchange(excahngeName, getExchangeType(obj));
                log.debug("Published Message to  Exchange {}", excahngeName);
                createChannel.basicPublish(getExcahngeName((Class) obj.getClass()), "all", (AMQP.BasicProperties) null, this.mapper.writer().writeValueAsBytes(obj));
                if (createChannel != null) {
                    createChannel.close();
                }
            } finally {
            }
        } catch (IOException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    protected void verifyOrUpdateExchange(String str, BuiltinExchangeType builtinExchangeType) throws IOException, TimeoutException {
        boolean z;
        if (this.exchangeSet.contains(str)) {
            return;
        }
        this.declarationLock.lock();
        try {
            if (!this.exchangeSet.contains(str)) {
                try {
                    Channel createChannel = this.connection.createChannel();
                    try {
                        createChannel.exchangeDeclarePassive(str);
                        log.debug("Exchange {} already exists", str);
                        z = true;
                        if (createChannel != null) {
                            createChannel.close();
                        }
                    } catch (Throwable th) {
                        if (createChannel != null) {
                            try {
                                createChannel.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (IOException e) {
                    z = false;
                }
                boolean z2 = false;
                if (!z) {
                    try {
                        Channel createChannel2 = this.connection.createChannel();
                        try {
                            createChannel2.exchangeDeclare(str, builtinExchangeType);
                            log.debug("Declared Exchange {}", str);
                            this.exchangeSet.add(str);
                            z2 = false;
                            if (createChannel2 != null) {
                                createChannel2.close();
                            }
                        } catch (Throwable th3) {
                            if (createChannel2 != null) {
                                try {
                                    createChannel2.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            }
                            throw th3;
                        }
                    } catch (IOException e2) {
                        z2 = true;
                    }
                }
                if (z2 && this.allowUpdate) {
                    Channel createChannel3 = this.connection.createChannel();
                    try {
                        createChannel3.exchangeDelete(str, true);
                        createChannel3.exchangeDeclare(str, builtinExchangeType);
                        this.exchangeSet.add(str);
                        if (createChannel3 != null) {
                            createChannel3.close();
                        }
                    } catch (Throwable th5) {
                        if (createChannel3 != null) {
                            try {
                                createChannel3.close();
                            } catch (Throwable th6) {
                                th5.addSuppressed(th6);
                            }
                        }
                        throw th5;
                    }
                }
            }
        } finally {
            this.declarationLock.unlock();
        }
    }

    public void clear() {
        this.memoryBusImpl.clear();
    }

    public void close() throws Exception {
        try {
            this.memoryBusImpl.close();
            this.connection.close();
        } catch (Exception e) {
            throw e;
        }
    }

    protected BuiltinExchangeType getExchangeType(Object obj) {
        return getExchangeType((Class) obj.getClass());
    }

    protected BuiltinExchangeType getExchangeType(Class cls) {
        ExchangeType exchangeType = (ExchangeType) cls.getAnnotation(ExchangeType.class);
        return (exchangeType == null || exchangeType.value() == null) ? BuiltinExchangeType.FANOUT : exchangeType.value();
    }

    protected String getQueueName(Object obj) {
        return getQueueName((Class) obj.getClass());
    }

    protected String getQueueName(Class cls) {
        QueueName queueName = (QueueName) cls.getAnnotation(QueueName.class);
        return queueName != null ? queueName.value() : cls.getSimpleName();
    }

    protected String getExcahngeName(Object obj) {
        return getExcahngeName((Class) obj.getClass());
    }

    protected String getExcahngeName(Class cls) {
        ExchangeName exchangeName = (ExchangeName) cls.getAnnotation(ExchangeName.class);
        return exchangeName != null ? exchangeName.value() : cls.getSimpleName();
    }

    protected Set<String> getRoutingKeys(Object obj) {
        return getRoutingKeys((Class) obj.getClass());
    }

    protected Set<String> getRoutingKeys(Class cls) {
        RoutingKey[] routingKeyArr = (RoutingKey[]) cls.getAnnotationsByType(RoutingKey.class);
        return (routingKeyArr == null || routingKeyArr.length == 0) ? Set.of("#") : (Set) Arrays.stream(routingKeyArr).map((v0) -> {
            return v0.value();
        }).distinct().collect(Collectors.toSet());
    }

    private void doAddListener(Class cls, Listener listener, int i, int i2) {
        if (i < 1 || i2 < 1) {
            this.rebindingExecutor.schedule(() -> {
                doAddListener(cls, listener, 1, 1);
            }, 50L, TimeUnit.MILLISECONDS);
            return;
        }
        if (i > i2) {
            log.error("Failure to add listener {} for event {} : too many retries({}/{})", new Object[]{listener, cls, Integer.valueOf(i), Integer.valueOf(i2)});
        }
        try {
            log.warn("Attempting to add listener {} for event {} : attempt ({}/{})", new Object[]{listener, cls, Integer.valueOf(i), Integer.valueOf(i2)});
            String excahngeName = getExcahngeName(cls);
            BuiltinExchangeType exchangeType = getExchangeType(cls);
            String queueName = getQueueName(listener);
            Set<String> routingKeys = getRoutingKeys(listener);
            Channel createChannel = this.connection.createChannel();
            createChannel.basicQos(this.maxPrefetch, false);
            verifyOrUpdateExchange(excahngeName, exchangeType);
            String queue = createChannel.queueDeclare(queueName, false, false, false, (Map) null).getQueue();
            Iterator<String> it = routingKeys.iterator();
            while (it.hasNext()) {
                createChannel.queueBind(queue, excahngeName, it.next());
            }
            createChannel.basicConsume(queueName, new RabbitMQBackingBusConsumer(createChannel, cls, listener));
            log.warn("Successfully added listener {} for event {} : attempt ({}/{})", new Object[]{listener, cls, Integer.valueOf(i), Integer.valueOf(i2)});
        } catch (Throwable th) {
            log.warn("Failed to add listener {} for event {} : {}, trying again", new Object[]{listener, cls, th.getMessage()});
            this.rebindingExecutor.schedule(() -> {
                doAddListener(cls, listener, i + 1, i2);
            }, i * this.retryThresholdMillis, TimeUnit.MILLISECONDS);
        }
    }

    protected void addListener(Class cls, Listener listener) {
        doAddListener(cls, listener, 1, this.retries);
    }

    private static ObjectMapper $default$mapper() {
        return new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS).disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
    }

    private static MemoryBackingBusImpl $default$memoryBusImpl() {
        return new MemoryBackingBusImpl();
    }

    private static boolean $default$allowUpdate() {
        return true;
    }

    private static int $default$maxPrefetch() {
        return 10;
    }

    private static boolean $default$requeue() {
        return true;
    }

    private static int $default$retries() {
        return 3;
    }

    private static int $default$retryThresholdMillis() {
        return 3000;
    }

    RabbitMQBackingBusImpl(@NonNull Connection connection, ObjectMapper objectMapper, MemoryBackingBusImpl memoryBackingBusImpl, boolean z, int i, boolean z2, int i2, int i3) {
        if (connection == null) {
            throw new NullPointerException("connection is marked non-null but is null");
        }
        this.connection = connection;
        this.mapper = objectMapper;
        this.memoryBusImpl = memoryBackingBusImpl;
        this.allowUpdate = z;
        this.maxPrefetch = i;
        this.requeue = z2;
        this.retries = i2;
        this.retryThresholdMillis = i3;
    }

    public static RabbitMQBackingBusImplBuilder builder() {
        return new RabbitMQBackingBusImplBuilder();
    }
}
