package com.sitewhere.device.communication.mqtt;

import com.sitewhere.server.lifecycle.LifecycleComponent;
import com.sitewhere.spi.SiteWhereException;
import com.sitewhere.spi.device.communication.IInboundEventReceiver;
import com.sitewhere.spi.device.communication.IInboundEventSource;
import com.sitewhere.spi.server.lifecycle.LifecycleComponentType;
import java.net.URISyntaxException;
import org.apache.log4j.Logger;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.Callback;
import org.fusesource.mqtt.client.CallbackConnection;
import org.fusesource.mqtt.client.Listener;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;

/* loaded from: input_file:com/sitewhere/device/communication/mqtt/MqttCallbackInboundEventReceiver.class */
public class MqttCallbackInboundEventReceiver extends LifecycleComponent implements IInboundEventReceiver<byte[]> {
    private static Logger LOGGER = Logger.getLogger(MqttInboundEventReceiver.class);
    public static final String DEFAULT_HOSTNAME = "localhost";
    public static final int DEFAULT_PORT = 1883;
    public static final long DEFAULT_CONNECT_TIMEOUT_SECS = 5;
    public static final String DEFAULT_TOPIC = "SiteWhere/input/protobuf";
    private IInboundEventSource<byte[]> eventSource;
    private String hostname;
    private int port;
    private String topic;
    private MQTT mqtt;
    private CallbackConnection connection;

    public MqttCallbackInboundEventReceiver() {
        super(LifecycleComponentType.InboundEventReceiver);
        this.hostname = "localhost";
        this.port = 1883;
        this.topic = "SiteWhere/input/protobuf";
    }

    public void start() throws SiteWhereException {
        try {
            this.mqtt = new MQTT();
            this.mqtt.setHost(getHostname(), getPort());
            LOGGER.info("Receiver connecting to MQTT broker at '" + getHostname() + ":" + getPort() + "'...");
            this.connection = this.mqtt.callbackConnection();
            createListener();
            this.connection.connect(new Callback<Void>() { // from class: com.sitewhere.device.communication.mqtt.MqttCallbackInboundEventReceiver.1
                public void onFailure(Throwable th) {
                    MqttCallbackInboundEventReceiver.LOGGER.error("MQTT connection failed.", th);
                }

                public void onSuccess(Void r9) {
                    MqttCallbackInboundEventReceiver.this.connection.subscribe(new Topic[]{new Topic(MqttCallbackInboundEventReceiver.this.getTopic(), QoS.AT_LEAST_ONCE)}, new Callback<byte[]>() { // from class: com.sitewhere.device.communication.mqtt.MqttCallbackInboundEventReceiver.1.1
                        public void onFailure(Throwable th) {
                            MqttCallbackInboundEventReceiver.LOGGER.error("MQTT subscribe failed.", th);
                        }

                        public void onSuccess(byte[] bArr) {
                            MqttCallbackInboundEventReceiver.LOGGER.info("Subscribed to events on MQTT topic: " + MqttCallbackInboundEventReceiver.this.getTopic());
                        }
                    });
                }
            });
        } catch (URISyntaxException e) {
            throw new SiteWhereException("Invalid hostname for MQTT server.", e);
        }
    }

    protected void createListener() {
        this.connection.listener(new Listener() { // from class: com.sitewhere.device.communication.mqtt.MqttCallbackInboundEventReceiver.2
            public void onDisconnected() {
                MqttCallbackInboundEventReceiver.LOGGER.info("MQTT connection disconnected.");
            }

            public void onConnected() {
                MqttCallbackInboundEventReceiver.LOGGER.info("MQTT connection established.");
            }

            public void onPublish(UTF8Buffer uTF8Buffer, Buffer buffer, Runnable runnable) {
                runnable.run();
                MqttCallbackInboundEventReceiver.this.onEventPayloadReceived(buffer.data);
            }

            public void onFailure(Throwable th) {
                MqttCallbackInboundEventReceiver.LOGGER.info("MQTT connection died.");
                MqttCallbackInboundEventReceiver.this.connection.disconnect(new Callback<Void>() { // from class: com.sitewhere.device.communication.mqtt.MqttCallbackInboundEventReceiver.2.1
                    public void onFailure(Throwable th2) {
                    }

                    public void onSuccess(Void r2) {
                    }
                });
            }
        });
    }

    public Logger getLogger() {
        return LOGGER;
    }

    public String getDisplayName() {
        return getHostname() + ":" + getPort() + "/" + getTopic();
    }

    public void onEventPayloadReceived(byte[] bArr) {
        getEventSource().onEncodedEventReceived(this, bArr);
    }

    public void stop() throws SiteWhereException {
        if (this.connection != null) {
            try {
                this.connection.disconnect(new Callback<Void>() { // from class: com.sitewhere.device.communication.mqtt.MqttCallbackInboundEventReceiver.3
                    public void onFailure(Throwable th) {
                        MqttCallbackInboundEventReceiver.LOGGER.info("MQTT disconnect failed.");
                    }

                    public void onSuccess(Void r2) {
                    }
                });
                this.connection.transport().stop(new Runnable() { // from class: com.sitewhere.device.communication.mqtt.MqttCallbackInboundEventReceiver.4
                    @Override // java.lang.Runnable
                    public void run() {
                        MqttCallbackInboundEventReceiver.LOGGER.info("MQTT connection transport stopped.");
                    }
                });
            } catch (Exception e) {
                LOGGER.error("Error shutting down MQTT device event receiver.", e);
            }
        }
    }

    public IInboundEventSource<byte[]> getEventSource() {
        return this.eventSource;
    }

    public void setEventSource(IInboundEventSource<byte[]> iInboundEventSource) {
        this.eventSource = iInboundEventSource;
    }

    public String getHostname() {
        return this.hostname;
    }

    public void setHostname(String str) {
        this.hostname = str;
    }

    public int getPort() {
        return this.port;
    }

    public void setPort(int i) {
        this.port = i;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String str) {
        this.topic = str;
    }
}
