package onion.mqtt.server.processor;

import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.AttributeKey;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import onion.mqtt.server.MqttServerConfig;
import onion.mqtt.server.MqttServerConst;
import onion.mqtt.server.MqttServerCreator;
import onion.mqtt.server.auth.IMqttServerConnectAuth;
import onion.mqtt.server.event.IMqttConnectStatusEvent;
import onion.mqtt.server.manager.MessageManager;
import onion.mqtt.server.manager.SessionManager;
import onion.mqtt.server.manager.SubscribeManager;
import onion.mqtt.server.store.MessageStore;
import onion.mqtt.server.store.SessionStore;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:onion/mqtt/server/processor/ConnectProcessor.class */
public class ConnectProcessor extends AbstractMqttServerProcessor<MqttConnectMessage> {
    static final Logger log = LoggerFactory.getLogger(ConnectProcessor.class);
    private final MqttServerConfig config;
    private final IMqttServerConnectAuth connectAuth;
    private final IMqttConnectStatusEvent connectStatusListener;

    public ConnectProcessor(MqttServerCreator mqttServerCreator) {
        this.config = mqttServerCreator.getConfig();
        this.connectAuth = mqttServerCreator.getConnectAuth();
        this.connectStatusListener = mqttServerCreator.getConnectStatusListener();
    }

    @Override // onion.mqtt.server.processor.AbstractMqttServerProcessor
    public void process(Channel channel, MqttConnectMessage mqttConnectMessage) {
        MqttConnectPayload payload = mqttConnectMessage.payload();
        String clientIdentifier = payload.clientIdentifier();
        if (StringUtils.isBlank(clientIdentifier)) {
            writeAndFlush(channel, MqttMessageBuilders.connAck().returnCode(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED).sessionPresent(false).build());
            close(channel);
            return;
        }
        String userName = payload.userName();
        String password = payload.password();
        if (this.connectAuth != null && !this.connectAuth.verifyAuthenticate(channel, clientIdentifier, userName, password)) {
            writeAndFlush(channel, MqttMessageBuilders.connAck().returnCode(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD).sessionPresent(false).build());
            close(channel);
            return;
        }
        channel.attr(AttributeKey.valueOf(MqttServerConst.CLIENT_ID)).set(clientIdentifier);
        if (SessionManager.getInstance().hasSession(clientIdentifier)) {
            SessionStore session = SessionManager.getInstance().getSession(clientIdentifier);
            if (session != null) {
                session.getChannel().close();
                if (session.isCleanSession()) {
                    SubscribeManager.getInstance().clearSubscribeByClient(clientIdentifier);
                    SessionManager.getInstance().removeSession(clientIdentifier);
                }
            }
        } else {
            SubscribeManager.getInstance().clearSubscribeByClient(clientIdentifier);
        }
        AtomicInteger atomicInteger = new AtomicInteger();
        if (mqttConnectMessage.variableHeader().keepAliveTimeSeconds() > 0 && channel.pipeline().names().contains("idle")) {
            channel.pipeline().remove("idle");
            atomicInteger.set(Math.round(mqttConnectMessage.variableHeader().keepAliveTimeSeconds() * 1.5f));
            channel.pipeline().addFirst("idle", new IdleStateHandler(atomicInteger.get(), this.config.getKeepAlive(), this.config.getKeepAlive(), TimeUnit.SECONDS));
        }
        SessionStore sessionStore = new SessionStore();
        sessionStore.setClientId(clientIdentifier);
        sessionStore.setChannel(channel);
        sessionStore.setExpire(atomicInteger.get());
        sessionStore.setCleanSession(mqttConnectMessage.variableHeader().isCleanSession());
        SessionManager.getInstance().addSession(sessionStore);
        MessageManager.getInstance().removeWillMessageByClient(clientIdentifier);
        if (mqttConnectMessage.variableHeader().isWillFlag()) {
            MessageStore messageStore = new MessageStore();
            messageStore.setClientId(clientIdentifier);
            messageStore.setTopic(payload.willTopic());
            messageStore.setQoS(mqttConnectMessage.variableHeader().willQos());
            messageStore.setRetain(mqttConnectMessage.variableHeader().isWillRetain());
            if (mqttConnectMessage.payload().willMessageInBytes() != null) {
                messageStore.setPayload(mqttConnectMessage.payload().willMessageInBytes());
            }
            messageStore.setTimestamp(System.currentTimeMillis());
            messageStore.setNodeId(this.config.getNodeId());
            MessageManager.getInstance().addWillMessage(messageStore);
        }
        writeAndFlush(channel, MqttMessageBuilders.connAck().returnCode(MqttConnectReturnCode.CONNECTION_ACCEPTED).sessionPresent(true).build());
        CompletableFuture.runAsync(() -> {
            try {
                if (this.connectStatusListener != null) {
                    this.connectStatusListener.online(channel, clientIdentifier);
                }
            } catch (Throwable th) {
                log.error("connect publishEvent error clientId: {}", clientIdentifier);
            }
        });
    }
}
