package com.sitewhere.device.communication.mqtt;

import com.sitewhere.server.lifecycle.LifecycleComponent;
import com.sitewhere.spi.SiteWhereException;
import com.sitewhere.spi.device.communication.IInboundEventReceiver;
import com.sitewhere.spi.device.communication.IInboundEventSource;
import com.sitewhere.spi.server.lifecycle.LifecycleComponentType;
import java.net.URISyntaxException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;

/* loaded from: input_file:com/sitewhere/device/communication/mqtt/MqttInboundEventReceiver.class */
public class MqttInboundEventReceiver extends LifecycleComponent implements IInboundEventReceiver<byte[]> {
    private static Logger LOGGER = Logger.getLogger(MqttInboundEventReceiver.class);
    public static final String DEFAULT_HOSTNAME = "localhost";
    public static final int DEFAULT_PORT = 1883;
    public static final long DEFAULT_CONNECT_TIMEOUT_SECS = 5;
    public static final String DEFAULT_TOPIC = "SiteWhere/input/protobuf";
    private IInboundEventSource<byte[]> eventSource;
    private String hostname;
    private int port;
    private String topic;
    private MQTT mqtt;
    private FutureConnection connection;
    private ExecutorService executor;

    /* loaded from: input_file:com/sitewhere/device/communication/mqtt/MqttInboundEventReceiver$MqttSubscriptionProcessor.class */
    private class MqttSubscriptionProcessor implements Runnable {
        private MqttSubscriptionProcessor() {
        }

        @Override // java.lang.Runnable
        public void run() {
            MqttInboundEventReceiver.LOGGER.info("Started MQTT subscription processing thread.");
            while (true) {
                try {
                    Message message = (Message) MqttInboundEventReceiver.this.connection.receive().await();
                    message.ack();
                    MqttInboundEventReceiver.this.onEventPayloadReceived(message.getPayload());
                } catch (InterruptedException e) {
                    return;
                } catch (Throwable th) {
                    MqttInboundEventReceiver.LOGGER.error(th);
                }
            }
        }
    }

    /* loaded from: input_file:com/sitewhere/device/communication/mqtt/MqttInboundEventReceiver$SubscribersThreadFactory.class */
    private class SubscribersThreadFactory implements ThreadFactory {
        private AtomicInteger counter;

        private SubscribersThreadFactory() {
            this.counter = new AtomicInteger();
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "SiteWhere MQTT(" + MqttInboundEventReceiver.this.getEventSource().getSourceId() + " - " + MqttInboundEventReceiver.this.getTopic() + ") Receiver " + this.counter.incrementAndGet());
        }
    }

    public MqttInboundEventReceiver() {
        super(LifecycleComponentType.InboundEventReceiver);
        this.hostname = "localhost";
        this.port = 1883;
        this.topic = DEFAULT_TOPIC;
    }

    public void start() throws SiteWhereException {
        this.executor = Executors.newSingleThreadExecutor(new SubscribersThreadFactory());
        this.mqtt = new MQTT();
        try {
            this.mqtt.setHost(getHostname(), getPort());
            LOGGER.info("Receiver connecting to MQTT broker at '" + getHostname() + ":" + getPort() + "'...");
            this.connection = this.mqtt.futureConnection();
            try {
                this.connection.connect().await(5L, TimeUnit.SECONDS);
                LOGGER.info("Receiver connected to MQTT broker.");
                try {
                    this.connection.subscribe(new Topic[]{new Topic(getTopic(), QoS.AT_LEAST_ONCE)}).await();
                    LOGGER.info("Subscribed to events on MQTT topic: " + getTopic());
                    this.executor.execute(new MqttSubscriptionProcessor());
                } catch (Exception e) {
                    throw new SiteWhereException("Exception while attempting to subscribe to MQTT topic: " + getTopic(), e);
                }
            } catch (Exception e2) {
                throw new SiteWhereException("Unable to connect to MQTT broker.", e2);
            }
        } catch (URISyntaxException e3) {
            throw new SiteWhereException("Invalid hostname for MQTT server.", e3);
        }
    }

    public Logger getLogger() {
        return LOGGER;
    }

    public String getDisplayName() {
        return getHostname() + ":" + getPort() + "/" + getTopic();
    }

    public void onEventPayloadReceived(byte[] bArr) {
        getEventSource().onEncodedEventReceived(this, bArr);
    }

    public void stop() throws SiteWhereException {
        if (this.executor != null) {
            this.executor.shutdownNow();
        }
        if (this.connection != null) {
            try {
                this.connection.disconnect();
                this.connection.kill();
            } catch (Exception e) {
                LOGGER.error("Error shutting down MQTT device event receiver.", e);
            }
        }
    }

    public IInboundEventSource<byte[]> getEventSource() {
        return this.eventSource;
    }

    public void setEventSource(IInboundEventSource<byte[]> iInboundEventSource) {
        this.eventSource = iInboundEventSource;
    }

    public String getHostname() {
        return this.hostname;
    }

    public void setHostname(String str) {
        this.hostname = str;
    }

    public int getPort() {
        return this.port;
    }

    public void setPort(int i) {
        this.port = i;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String str) {
        this.topic = str;
    }
}
