package cool.taomu.mqtt.broker.factory;

import cool.taomu.mqtt.broker.entity.ClientSessionEntity;
import cool.taomu.mqtt.broker.entity.MessageEntity;
import cool.taomu.mqtt.broker.utils.MqttUtils;
import cool.taomu.mqtt.broker.utils.impl.DataStorage;
import cool.taomu.storage.inter.IStorage;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.timeout.IdleStateHandler;
import org.eclipse.xtext.xbase.lib.Exceptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cool/taomu/mqtt/broker/factory/ConnectRequest.class */
public class ConnectRequest implements IProcess {
    private static final Logger LOG = LoggerFactory.getLogger(ConnectRequest.class);
    private IStorage cache = new DataStorage();
    private MqttConnectReturnCode code;
    private boolean sessionPresent;

    @Override // cool.taomu.mqtt.broker.factory.IProcess
    public void request(ChannelHandlerContext channelHandlerContext, MqttMessage mqttMessage) {
        if (mqttMessage instanceof MqttConnectMessage) {
            LOG.info("执行 Connect 操作");
            try {
                try {
                    MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
                    int version = mqttConnectMessage.variableHeader().version();
                    LOG.info("mqtt version:{}", Integer.valueOf(version));
                    boolean isCleanSession = mqttConnectMessage.variableHeader().isCleanSession();
                    String clientIdentifier = mqttConnectMessage.payload().clientIdentifier();
                    MqttUtils.setClientId(channelHandlerContext.channel(), clientIdentifier);
                    String userName = mqttConnectMessage.payload().userName();
                    byte[] passwordInBytes = mqttConnectMessage.payload().passwordInBytes();
                    int keepAliveTimeSeconds = mqttConnectMessage.variableHeader().keepAliveTimeSeconds();
                    LOG.info("clientId:{},cleanSession:{}", clientIdentifier, Boolean.valueOf(isCleanSession));
                    if (!CheckConnect.version(version)) {
                        this.code = MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION;
                        this.sessionPresent = false;
                    } else {
                        if (!CheckConnect.clientId(channelHandlerContext.channel(), clientIdentifier)) {
                            this.code = MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
                            this.sessionPresent = false;
                        } else {
                            if (!CheckConnect.authorized(MqttUtils.getRemoteAddr(channelHandlerContext.channel()), clientIdentifier)) {
                                this.code = MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
                                this.sessionPresent = false;
                            } else {
                                if (!CheckConnect.userAuth(clientIdentifier, userName, passwordInBytes)) {
                                    this.code = MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
                                    this.sessionPresent = false;
                                } else {
                                    if (!keepAlive(clientIdentifier, channelHandlerContext, keepAliveTimeSeconds)) {
                                        throw new Exception(String.format("set heartbeat failure clientId:%s,heartbeatSec:%d", clientIdentifier, Integer.valueOf(keepAliveTimeSeconds)));
                                    }
                                    this.code = MqttConnectReturnCode.CONNECTION_ACCEPTED;
                                    this.sessionPresent = createSession(clientIdentifier, channelHandlerContext, isCleanSession).booleanValue();
                                    storeWill(mqttConnectMessage, clientIdentifier);
                                }
                            }
                        }
                    }
                    response(channelHandlerContext, mqttMessage);
                } catch (Throwable th) {
                    if (!(th instanceof Exception)) {
                        throw Exceptions.sneakyThrow(th);
                    }
                    LOG.info("服务不可用 :", (Exception) th);
                    this.code = MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;
                    this.sessionPresent = false;
                    response(channelHandlerContext, mqttMessage);
                }
            } catch (Throwable th2) {
                response(channelHandlerContext, mqttMessage);
                throw th2;
            }
        }
    }

    public ChannelFuture response(ChannelHandlerContext channelHandlerContext, MqttMessage mqttMessage) {
        return channelHandlerContext.writeAndFlush(new MqttConnAckMessage(new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(this.code, this.sessionPresent)));
    }

    protected Boolean createSession(String str, ChannelHandlerContext channelHandlerContext, boolean z) {
        LOG.info("记录用户session：{}", str);
        new ClientSessionEntity();
        boolean z2 = false;
        ClientSessionEntity clientSessionEntity = new ClientSessionEntity();
        clientSessionEntity.setClientId(str);
        clientSessionEntity.setCtx(channelHandlerContext);
        clientSessionEntity.setCleanStatus(z);
        if (!z) {
            z2 = true;
        }
        this.cache.put("mqtt-session", str, clientSessionEntity);
        return Boolean.valueOf(z2);
    }

    protected int storeWill(MqttConnectMessage mqttConnectMessage, String str) {
        int i = 0;
        if (mqttConnectMessage.variableHeader().isWillFlag()) {
            LOG.info("保存遗嘱消息 ： clientId:{}", str);
            MessageEntity messageEntity = new MessageEntity();
            messageEntity.setSenderId(str);
            messageEntity.setRetain(mqttConnectMessage.variableHeader().isWillRetain());
            messageEntity.setQos(mqttConnectMessage.variableHeader().willQos());
            messageEntity.setTopic(mqttConnectMessage.payload().willTopic());
            messageEntity.setPayload(mqttConnectMessage.payload().willMessageInBytes());
            i = this.cache.put("mqtt-will", str, messageEntity);
        }
        return i;
    }

    public boolean keepAlive(String str, ChannelHandlerContext channelHandlerContext, int i) {
        LOG.info("设置keep alive");
        int i2 = (int) (i * 1.5f);
        if (channelHandlerContext.pipeline().names().contains("idleStateHandler")) {
            channelHandlerContext.pipeline().remove("idleStateHandler");
        }
        channelHandlerContext.pipeline().addFirst("idleStateHandler", new IdleStateHandler(i2, 0, 0));
        return true;
    }
}
