package com.netflix.conductor.contribs.queue.stan;

import io.nats.client.Connection;
import io.nats.client.Nats;
import io.nats.client.Subscription;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Scheduler;

/* loaded from: input_file:com/netflix/conductor/contribs/queue/stan/NATSObservableQueue.class */
public class NATSObservableQueue extends NATSAbstractQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(NATSObservableQueue.class);
    private Subscription subs;
    private Connection conn;

    public NATSObservableQueue(String str, Scheduler scheduler) {
        super(str, "nats", scheduler);
        open();
    }

    @Override // com.netflix.conductor.contribs.queue.stan.NATSAbstractQueue
    public boolean isConnected() {
        return this.conn != null && Connection.Status.CONNECTED.equals(this.conn.getStatus());
    }

    @Override // com.netflix.conductor.contribs.queue.stan.NATSAbstractQueue
    public void connect() {
        try {
            Connection connect = Nats.connect();
            LOGGER.info("Successfully connected for " + this.queueURI);
            this.conn = connect;
        } catch (Exception e) {
            LOGGER.error("Unable to establish nats connection for " + this.queueURI, e);
            throw new RuntimeException(e);
        }
    }

    @Override // com.netflix.conductor.contribs.queue.stan.NATSAbstractQueue
    public void subscribe() {
        if (this.subs != null) {
            return;
        }
        try {
            ensureConnected();
            if (StringUtils.isNotEmpty(this.queue)) {
                LOGGER.info("No subscription. Creating a queue subscription. subject={}, queue={}", this.subject, this.queue);
                this.conn.createDispatcher(message -> {
                    onMessage(message.getSubject(), message.getData());
                });
                this.subs = this.conn.subscribe(this.subject, this.queue);
            } else {
                LOGGER.info("No subscription. Creating a pub/sub subscription. subject={}", this.subject);
                this.conn.createDispatcher(message2 -> {
                    onMessage(message2.getSubject(), message2.getData());
                });
                this.subs = this.conn.subscribe(this.subject);
            }
        } catch (Exception e) {
            LOGGER.error("Subscription failed with " + e.getMessage() + " for queueURI " + this.queueURI, e);
        }
    }

    @Override // com.netflix.conductor.contribs.queue.stan.NATSAbstractQueue
    public void publish(String str, byte[] bArr) throws Exception {
        ensureConnected();
        this.conn.publish(str, bArr);
    }

    @Override // com.netflix.conductor.contribs.queue.stan.NATSAbstractQueue
    public void closeSubs() {
        if (this.subs != null) {
            try {
                this.subs.unsubscribe();
            } catch (Exception e) {
                LOGGER.error("closeSubs failed with " + e.getMessage() + " for " + this.queueURI, e);
            }
            this.subs = null;
        }
    }

    @Override // com.netflix.conductor.contribs.queue.stan.NATSAbstractQueue
    public void closeConn() {
        if (this.conn != null) {
            try {
                this.conn.close();
            } catch (Exception e) {
                LOGGER.error("closeConn failed with " + e.getMessage() + " for " + this.queueURI, e);
            }
            this.conn = null;
        }
    }
}
