package org.enode.pulsar.message;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.Producer;
import org.enodeframework.common.exception.IORuntimeException;
import org.enodeframework.queue.QueueMessage;
import org.enodeframework.queue.SendMessageResult;
import org.enodeframework.queue.SendMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/enode/pulsar/message/PulsarSendMessageService.class */
public class PulsarSendMessageService implements SendMessageService {
    private static final Logger logger = LoggerFactory.getLogger(PulsarSendMessageService.class);
    private final Map<Character, Producer<byte[]>> producerMap;

    public PulsarSendMessageService(Map<Character, Producer<byte[]>> map) {
        this.producerMap = map;
    }

    public CompletableFuture<SendMessageResult> sendMessageAsync(QueueMessage queueMessage) {
        Producer<byte[]> producer = this.producerMap.get(Character.valueOf(queueMessage.getType()));
        CompletableFuture<SendMessageResult> completableFuture = new CompletableFuture<>();
        if (producer != null) {
            producer.newMessage().key(queueMessage.getRouteKey()).value(queueMessage.getBodyAndType().getBytes()).orderingKey(queueMessage.getKey().getBytes()).sendAsync().whenComplete((messageId, th) -> {
                if (th != null) {
                    logger.error("Async send message has exception, message: {}", queueMessage, th);
                    completableFuture.completeExceptionally(new IORuntimeException(th));
                } else {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Async send message success, sendResult: {}, message: {}", messageId, queueMessage);
                    }
                    completableFuture.complete(new SendMessageResult(new String(messageId.toByteArray()), messageId));
                }
            });
            return completableFuture;
        }
        String format = String.format("No producer for topic: [%s], %s", Character.valueOf(queueMessage.getType()), queueMessage.getTopic());
        completableFuture.completeExceptionally(new IORuntimeException(format));
        logger.error(format);
        return completableFuture;
    }
}
