package me.hekr.iotos.softgateway.network.mqtt;

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.mqtt.MqttAuth;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttTopicSubscription;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import me.hekr.iotos.softgateway.network.mqtt.listener.Listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:me/hekr/iotos/softgateway/network/mqtt/MqttServer.class */
public class MqttServer<T> {
    private static final Logger log = LoggerFactory.getLogger(MqttServer.class);
    private final int poolSize;
    PacketCoder<T> packetCoder;
    private Vertx vertx;
    private io.vertx.mqtt.MqttServer server;
    private int port;
    private Listener<T> listener;

    public MqttServer(int i) {
        this.poolSize = Runtime.getRuntime().availableProcessors() * 4;
        this.port = i;
    }

    public MqttServer() {
        this(1883);
    }

    public void start() {
        Objects.requireNonNull(this.packetCoder, "packetCoder 必填");
        Objects.requireNonNull(this.listener, "listener 必填");
        VertxOptions vertxOptions = new VertxOptions();
        vertxOptions.setEventLoopPoolSize(this.poolSize);
        vertxOptions.setWorkerPoolSize(this.poolSize);
        this.vertx = Vertx.vertx(vertxOptions);
        this.server = io.vertx.mqtt.MqttServer.create(this.vertx);
        this.server.exceptionHandler(th -> {
            log.error(th.getMessage(), th);
        });
        this.server.endpointHandler(mqttEndpoint -> {
            MqttConnections.add(mqttEndpoint).setPacketCoder(this.packetCoder);
            mqttEndpoint.autoKeepAlive(true);
            mqttEndpoint.publishAutoAck(true);
            handleClose(mqttEndpoint);
            handlePublishMessage(mqttEndpoint);
            handleAuth(mqttEndpoint);
            handleException(mqttEndpoint);
            handleSubscribe(mqttEndpoint);
        });
        this.server.listen(this.port).onComplete(asyncResult -> {
            if (asyncResult.succeeded()) {
                log.info("mqtt server 启动成功，绑定端口：{}", Integer.valueOf(this.server.actualPort()));
            } else {
                log.error("mqtt server 启动失败，" + asyncResult.cause().getMessage(), asyncResult.cause());
            }
        });
    }

    private void handleSubscribe(MqttEndpoint mqttEndpoint) {
        mqttEndpoint.subscribeHandler(mqttSubscribeMessage -> {
            ConnectionContext<T> connectionContext = MqttConnections.get(mqttEndpoint);
            List<MqttQoS> aclSubTopic = this.listener.aclSubTopic(connectionContext, mqttSubscribeMessage.topicSubscriptions());
            if (log.isDebugEnabled()) {
                for (int i = 0; i < mqttSubscribeMessage.topicSubscriptions().size(); i++) {
                    MqttTopicSubscription mqttTopicSubscription = (MqttTopicSubscription) mqttSubscribeMessage.topicSubscriptions().get(i);
                    MqttQoS mqttQoS = aclSubTopic.get(i);
                    log.debug((mqttQoS == MqttQoS.FAILURE ? "不允许 " : "允许 ") + connectionContext.getClientId() + " 订阅 " + mqttTopicSubscription.topicName() + " qos:" + mqttTopicSubscription.qualityOfService() + " 协商 qos:" + mqttQoS);
                }
            }
            mqttEndpoint.subscribeAcknowledge(mqttSubscribeMessage.messageId(), aclSubTopic);
        });
    }

    private void handleException(MqttEndpoint mqttEndpoint) {
        mqttEndpoint.exceptionHandler(th -> {
            log.error(th.getMessage(), th);
        });
    }

    private void handleClose(MqttEndpoint mqttEndpoint) {
        mqttEndpoint.closeHandler(r5 -> {
            this.listener.onClose(MqttConnections.remove(mqttEndpoint));
        });
    }

    private void handlePublishMessage(MqttEndpoint mqttEndpoint) {
        mqttEndpoint.publishHandler(mqttPublishMessage -> {
            ConnectionContext<T> connectionContext = MqttConnections.get(mqttEndpoint);
            if (this.listener.aclPubTopic(connectionContext, mqttPublishMessage.topicName(), mqttPublishMessage.qosLevel()) || !log.isDebugEnabled()) {
                this.listener.onMessage(connectionContext, mqttPublishMessage.topicName(), mqttPublishMessage.qosLevel(), this.packetCoder.decode(mqttPublishMessage.payload().getBytes()));
            } else {
                log.debug("publish topic 拒绝，丢弃消息, clientId: {}, username: {}, topic: {}, qos:{}", new Object[]{connectionContext.getClientId(), connectionContext.getUsername(), mqttPublishMessage.topicName(), mqttPublishMessage.qosLevel()});
            }
        });
    }

    private void handleAuth(MqttEndpoint mqttEndpoint) {
        MqttAuth auth = mqttEndpoint.auth();
        String username = auth == null ? null : auth.getUsername();
        String password = auth == null ? null : auth.getPassword();
        ConnectionContext<T> connectionContext = MqttConnections.get(mqttEndpoint);
        connectionContext.setUsername(username);
        connectionContext.setPassword(password);
        boolean auth2 = this.listener.auth(connectionContext);
        connectionContext.setAuth(auth2);
        connectionContext.setAuthTime(LocalDateTime.now());
        if (auth2) {
            mqttEndpoint.accept(false);
        } else {
            mqttEndpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
        }
    }

    public void close() {
        if (this.server != null) {
            this.server.close();
        }
        if (this.vertx != null) {
            this.vertx.close();
        }
    }

    public void setPacketCoder(PacketCoder<T> packetCoder) {
        this.packetCoder = packetCoder;
    }

    public void setPort(int i) {
        this.port = i;
    }

    public void setListener(Listener<T> listener) {
        this.listener = listener;
    }
}
