package ch.squaredesk.nova.comm.jms;

import ch.squaredesk.nova.comm.retrieving.IncomingMessage;
import ch.squaredesk.nova.metrics.Metrics;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:ch/squaredesk/nova/comm/jms/MessageReceiver.class */
public class MessageReceiver extends ch.squaredesk.nova.comm.retrieving.MessageReceiver<Destination, String, IncomingMessageMetaData> {
    private static final Logger logger = LoggerFactory.getLogger(MessageReceiver.class);
    private final JmsObjectRepository jmsObjectRepository;
    private final Map<String, Flowable<IncomingMessage<String, IncomingMessageMetaData>>> mapDestinationIdToMessageStream;
    private final JmsMessageMetaDataCreator messageDetailsCreator;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageReceiver(String str, JmsObjectRepository jmsObjectRepository, Metrics metrics) {
        super(str, metrics);
        this.mapDestinationIdToMessageStream = new ConcurrentHashMap();
        this.messageDetailsCreator = new JmsMessageMetaDataCreator();
        this.jmsObjectRepository = jmsObjectRepository;
    }

    public Flowable<IncomingMessage<String, IncomingMessageMetaData>> messages(Destination destination) {
        Objects.requireNonNull(destination, "destination must not ne bull");
        Objects.requireNonNull(destination, "unmarshaller must not ne bull");
        String idFor = this.jmsObjectRepository.idFor(destination);
        return this.mapDestinationIdToMessageStream.computeIfAbsent(idFor, str -> {
            return Flowable.generate(() -> {
                logger.info("Opening connection to destination " + idFor);
                this.metricsCollector.subscriptionCreated(idFor);
                return this.jmsObjectRepository.createMessageConsumer(destination);
            }, (messageConsumer, emitter) -> {
                IncomingMessage incomingMessage = null;
                while (incomingMessage == null) {
                    Message message = null;
                    try {
                        message = messageConsumer.receive();
                    } catch (Exception e) {
                    }
                    if (message == null) {
                        logger.info("Unable to receive message from consumer for destination " + idFor + ". Closing the connection...");
                        emitter.onComplete();
                        return;
                    } else if (message instanceof TextMessage) {
                        try {
                            incomingMessage = new IncomingMessage(((TextMessage) message).getText(), this.messageDetailsCreator.createIncomingMessageMetaData(message));
                            this.metricsCollector.messageReceived(idFor);
                        } catch (Exception e2) {
                            logger.error("Unable to read incoming message " + message, e2);
                            this.metricsCollector.unparsableMessageReceived(idFor);
                        }
                    } else {
                        logger.error("Unsupported type of incoming message " + message);
                        this.metricsCollector.unparsableMessageReceived(idFor);
                    }
                }
                emitter.onNext(incomingMessage);
            }, messageConsumer2 -> {
                this.metricsCollector.subscriptionDestroyed(idFor);
                this.jmsObjectRepository.destroyConsumer(messageConsumer2);
                this.mapDestinationIdToMessageStream.remove(idFor);
                logger.info("Closed connection to destination " + idFor);
            }).subscribeOn(Schedulers.io()).share();
        });
    }
}
