package me.ahoo.pigeon.bus.rabbit;

import com.google.common.base.Strings;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import me.ahoo.pigeon.bus.rabbit.codec.RabbitMessageEncoder;
import me.ahoo.pigeon.bus.rabbit.config.BusRabbitConfig;
import me.ahoo.pigeon.core.bus.MessageBus;
import me.ahoo.pigeon.core.bus.MessageTopicParser;
import me.ahoo.pigeon.core.bus.PublishResult;
import me.ahoo.pigeon.core.bus.subscriber.Subscriber;
import me.ahoo.pigeon.core.bus.subscriber.SubscriberRegistry;
import me.ahoo.pigeon.core.id.IdGenerator;
import me.ahoo.pigeon.core.message.Message;
import me.ahoo.pigeon.core.util.Futures;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/* loaded from: input_file:me/ahoo/pigeon/bus/rabbit/RabbitMessageBus.class */
public class RabbitMessageBus implements MessageBus, AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(RabbitMessageBus.class);
    private final RabbitMessageEncoder rabbitMessageEncoder;
    private final SubscriberRegistry subscriberRegistry;
    private final MessageTopicParser messageTopicParser;
    private final IdGenerator idGenerator;
    private final BusRabbitConfig busRabbitConfig;
    private final RabbitTemplate rabbitTemplate;

    public RabbitMessageBus(BusRabbitConfig busRabbitConfig, RabbitMessageEncoder rabbitMessageEncoder, SubscriberRegistry subscriberRegistry, MessageTopicParser messageTopicParser, ConnectionFactory connectionFactory, IdGenerator idGenerator) {
        this.busRabbitConfig = busRabbitConfig;
        this.rabbitTemplate = new RabbitTemplate(connectionFactory);
        this.rabbitTemplate.setExchange(busRabbitConfig.getExchange());
        this.rabbitMessageEncoder = rabbitMessageEncoder;
        this.subscriberRegistry = subscriberRegistry;
        this.messageTopicParser = messageTopicParser;
        this.idGenerator = idGenerator;
    }

    public CompletableFuture<PublishResult> publish(Message message) {
        try {
            if (Objects.isNull(message.getId(false))) {
                message.setId(Long.valueOf(this.idGenerator.generate()));
            }
            if (Objects.isNull(message.getCreateTime(false))) {
                message.ofNow();
            }
            String parseMessageTopic = this.messageTopicParser.parseMessageTopic(message);
            this.rabbitTemplate.send(parseMessageTopic, this.rabbitMessageEncoder.encode((Message<Object>) message));
            if (log.isDebugEnabled()) {
                log.debug("publish - exchange:[{}] routingKey:[{}] -> message.id:[{}] ,direction:[{}],commandType:[{}]", new Object[]{this.busRabbitConfig.getExchange(), parseMessageTopic, message.getId(false), message.getDirection(false), message.getCommandType(false)});
            }
            return CompletableFuture.completedFuture(new PublishResult(message));
        } catch (Throwable th) {
            log.error(Strings.lenientFormat("publish - failed! -> id:[%s] ,direction:[%s],commandType:[%s]", new Object[]{message.getId(false), message.getDirection(false), message.getCommandType(false)}), th);
            return Futures.ofFailed(th);
        }
    }

    public void subscribe(Subscriber subscriber) {
        this.subscriberRegistry.register(subscriber);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.rabbitTemplate.destroy();
    }
}
