package poussecafe.pulsar;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import poussecafe.exception.PousseCafeException;
import poussecafe.jackson.JacksonMessageAdapter;
import poussecafe.messaging.Message;
import poussecafe.messaging.MessageSender;
import poussecafe.runtime.OriginalAndMarshaledMessage;

/* loaded from: input_file:poussecafe/pulsar/PulsarMessageSender.class */
public class PulsarMessageSender extends MessageSender {
    private PulsarMessagingConfiguration configuration;
    private PulsarClient client;
    private Producer<String> defaultTopicProducer;
    private Map<String, Producer<String>> producers;

    /* loaded from: input_file:poussecafe/pulsar/PulsarMessageSender$Builder.class */
    public static class Builder {
        private PulsarMessageSender sender = new PulsarMessageSender();

        public Builder configuration(PulsarMessagingConfiguration pulsarMessagingConfiguration) {
            this.sender.configuration = pulsarMessagingConfiguration;
            return this;
        }

        public Builder client(PulsarClient pulsarClient) {
            this.sender.client = pulsarClient;
            return this;
        }

        public PulsarMessageSender build() {
            Objects.requireNonNull(this.sender.configuration);
            Objects.requireNonNull(this.sender.client);
            this.sender.defaultTopicProducer = this.sender.createProducer(this.sender.configuration.defaultPublicationTopic());
            return this.sender;
        }
    }

    private PulsarMessageSender() {
        super(new JacksonMessageAdapter());
        this.producers = new HashMap();
    }

    private Producer<String> createProducer(String str) {
        try {
            return this.client.newProducer(Schema.STRING).topic(str).create();
        } catch (PulsarClientException e) {
            throw new PousseCafeException("Unable to connect to Pulsar broker", e);
        }
    }

    protected synchronized void sendMarshalledMessage(OriginalAndMarshaledMessage originalAndMarshaledMessage) {
        try {
            producer(originalAndMarshaledMessage.original()).send((String) originalAndMarshaledMessage.marshaled());
        } catch (PulsarClientException e) {
            throw new PousseCafeException("Unable to send message to Pulsar broker", e);
        }
    }

    private Producer<String> producer(Message message) {
        Optional<String> chooseTopicForMessage = this.configuration.publicationTopicChooser().chooseTopicForMessage(message);
        return chooseTopicForMessage.isPresent() ? getOrCreateProducer(chooseTopicForMessage.get()) : this.defaultTopicProducer;
    }

    private Producer<String> getOrCreateProducer(String str) {
        Producer<String> producer = this.producers.get(str);
        if (producer == null) {
            producer = createProducer(str);
            this.producers.put(str, producer);
        }
        return producer;
    }

    public void sendMarshalledMessages(List<OriginalAndMarshaledMessage> list) {
        Iterator<Map.Entry<Producer<String>, List<OriginalAndMarshaledMessage>>> it = messagesPerProducer(list).entrySet().iterator();
        while (it.hasNext()) {
            sendBulk(it.next());
        }
    }

    private Map<Producer<String>, List<OriginalAndMarshaledMessage>> messagesPerProducer(List<OriginalAndMarshaledMessage> list) {
        HashMap hashMap = new HashMap();
        for (OriginalAndMarshaledMessage originalAndMarshaledMessage : list) {
            ((List) hashMap.computeIfAbsent(producer(originalAndMarshaledMessage.original()), producer -> {
                return new ArrayList();
            })).add(originalAndMarshaledMessage);
        }
        return hashMap;
    }

    private void sendBulk(Map.Entry<Producer<String>, List<OriginalAndMarshaledMessage>> entry) {
        Producer<String> key = entry.getKey();
        Iterator<OriginalAndMarshaledMessage> it = entry.getValue().iterator();
        while (it.hasNext()) {
            key.sendAsync((String) it.next().marshaled());
        }
        try {
            key.flush();
        } catch (PulsarClientException e) {
            throw new PousseCafeException("Unable to send messages to Pulsar broker", e);
        }
    }
}
