package cn.hamm.airpower.websocket;

import cn.hamm.airpower.config.Configs;
import cn.hamm.airpower.exception.ServiceException;
import cn.hamm.airpower.model.Json;
import cn.hamm.airpower.util.Utils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.jetbrains.annotations.Contract;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.Subscription;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

@Component
/* loaded from: input_file:cn/hamm/airpower/websocket/WebSocketHandler.class */
public class WebSocketHandler extends TextWebSocketHandler implements MessageListener {
    private static final Logger log = LoggerFactory.getLogger(WebSocketHandler.class);
    public static final String CHANNEL_ALL = "WEBSOCKET_ALL";
    public static final String CHANNEL_USER_PREFIX = "WEBSOCKET_USER_";
    protected final ConcurrentHashMap<String, RedisConnection> redisConnectionHashMap = new ConcurrentHashMap<>();
    protected final ConcurrentHashMap<String, MqttClient> mqttClientHashMap = new ConcurrentHashMap<>();
    protected final ConcurrentHashMap<String, Long> userIdHashMap = new ConcurrentHashMap<>();

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    protected final void handleTextMessage(@NonNull WebSocketSession webSocketSession, @NotNull TextMessage textMessage) {
        String str = (String) textMessage.getPayload();
        if (Configs.getWebsocketConfig().getPing().equalsIgnoreCase(str)) {
            try {
                webSocketSession.sendMessage(new TextMessage(Configs.getWebsocketConfig().getPong()));
                return;
            } catch (IOException e) {
                log.error("发送Websocket消息失败: {}", e.getMessage());
                return;
            }
        }
        try {
            onWebSocketPayload((WebSocketPayload) Json.parse(str, WebSocketPayload.class), webSocketSession);
        } catch (Exception e2) {
            log.error("解析Websocket事件负载失败: {}", e2.getMessage());
        }
    }

    protected final void sendWebSocketPayload(@NotNull WebSocketSession webSocketSession, @NotNull WebSocketPayload webSocketPayload) {
        try {
            webSocketSession.sendMessage(new TextMessage(Json.toString(WebSocketEvent.create(webSocketPayload))));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void onWebSocketPayload(@NotNull WebSocketPayload webSocketPayload, @NotNull WebSocketSession webSocketSession) {
        log.info("负载类型: {}, 负载内容: {}", webSocketPayload.getType(), webSocketPayload.getData());
    }

    public final void afterConnectionEstablished(@NonNull WebSocketSession webSocketSession) {
        if (Objects.isNull(webSocketSession.getUri())) {
            return;
        }
        String query = webSocketSession.getUri().getQuery();
        if (Objects.isNull(query)) {
            log.warn("没有传入AccessToken 即将关闭连接");
            closeConnection(webSocketSession);
            return;
        }
        long idFromAccessToken = Utils.getSecurityUtil().getIdFromAccessToken(query);
        switch (Configs.getWebsocketConfig().getSupport()) {
            case REDIS:
                startRedisListener(webSocketSession, idFromAccessToken);
                break;
            case MQTT:
                startMqttListener(webSocketSession, idFromAccessToken);
                break;
            case NO:
                break;
            default:
                throw new RuntimeException("WebSocket暂不支持");
        }
        this.userIdHashMap.put(webSocketSession.getId(), Long.valueOf(idFromAccessToken));
    }

    private void onChannelMessage(@NotNull String str, @NonNull WebSocketSession webSocketSession) {
        try {
            webSocketSession.sendMessage(new TextMessage(str));
        } catch (Exception e) {
            log.error("消息发送失败", e);
        }
    }

    /* JADX WARN: Type inference failed for: r2v3, types: [byte[], byte[][]] */
    private void startRedisListener(@NotNull WebSocketSession webSocketSession, long j) {
        String realChannel = getRealChannel("WEBSOCKET_USER_" + j);
        RedisConnection connection = this.redisConnectionFactory.getConnection();
        this.redisConnectionHashMap.put(webSocketSession.getId(), connection);
        connection.subscribe((message, bArr) -> {
            synchronized (webSocketSession) {
                onChannelMessage(new String(message.getBody(), StandardCharsets.UTF_8), webSocketSession);
            }
        }, (byte[][]) new byte[]{getRealChannel(CHANNEL_ALL).getBytes(StandardCharsets.UTF_8), realChannel.getBytes(StandardCharsets.UTF_8)});
    }

    private void startMqttListener(@NotNull final WebSocketSession webSocketSession, long j) {
        try {
            MqttClient createClient = Utils.getMqttUtil().createClient();
            try {
                createClient.setCallback(new MqttCallback() { // from class: cn.hamm.airpower.websocket.WebSocketHandler.1
                    public void connectionLost(Throwable th) {
                    }

                    public void messageArrived(String str, MqttMessage mqttMessage) {
                        synchronized (webSocketSession) {
                            WebSocketHandler.this.onChannelMessage(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8), webSocketSession);
                        }
                    }

                    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    }
                });
                createClient.connect(Utils.getMqttUtil().createOption());
                createClient.subscribe(new String[]{CHANNEL_ALL, "WEBSOCKET_USER_" + j});
                this.mqttClientHashMap.put(webSocketSession.getId(), createClient);
                if (createClient != null) {
                    createClient.close();
                }
            } finally {
            }
        } catch (MqttException e) {
            throw new ServiceException((Exception) e);
        }
    }

    private void closeConnection(@NotNull WebSocketSession webSocketSession) {
        try {
            webSocketSession.close();
        } catch (IOException e) {
            log.error("关闭Websocket失败");
        }
    }

    @Contract(pure = true)
    public final void afterConnectionClosed(@NotNull WebSocketSession webSocketSession, @NotNull CloseStatus closeStatus) {
        try {
            String id = webSocketSession.getId();
            if (Objects.nonNull(this.redisConnectionHashMap.get(id))) {
                this.redisConnectionHashMap.remove(id).close();
            }
            if (Objects.nonNull(this.mqttClientHashMap.get(id))) {
                this.mqttClientHashMap.remove(id).close();
            }
            if (Objects.nonNull(this.userIdHashMap.get(id))) {
                this.userIdHashMap.remove(id);
            }
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }

    @Contract(pure = true)
    public final void onMessage(@NotNull Message message, byte[] bArr) {
    }

    /* JADX WARN: Type inference failed for: r1v3, types: [byte[], byte[][]] */
    protected final void redisSubscribe(@NotNull String str, WebSocketSession webSocketSession) {
        log.info("REDIS开始订阅频道: {}", getRealChannel(str));
        getRedisSubscription(webSocketSession).subscribe((byte[][]) new byte[]{getRealChannel(str).getBytes(StandardCharsets.UTF_8)});
    }

    protected final void mqttSubscribe(String str, WebSocketSession webSocketSession) {
        log.info("MQTT开始订阅频道: {}", getRealChannel(str));
        try {
            getMqttClient(webSocketSession).subscribe(getRealChannel(str));
        } catch (MqttException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @NotNull
    protected final String getRealChannel(String str) {
        return Configs.getWebsocketConfig().getChannelPrefix() + "_" + str;
    }

    /* JADX WARN: Type inference failed for: r1v3, types: [byte[], byte[][]] */
    protected final void redisUnSubscribe(@NotNull String str, WebSocketSession webSocketSession) {
        log.info("REDIS取消订阅频道: {}", getRealChannel(str));
        getRedisSubscription(webSocketSession).unsubscribe((byte[][]) new byte[]{getRealChannel(str).getBytes(StandardCharsets.UTF_8)});
    }

    protected final void mqttUnSubscribe(String str, WebSocketSession webSocketSession) {
        log.info("MQTT取消订阅频道: {}", getRealChannel(str));
        try {
            getMqttClient(webSocketSession).unsubscribe(getRealChannel(str));
        } catch (MqttException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    protected final MqttClient getMqttClient(@NotNull WebSocketSession webSocketSession) {
        MqttClient mqttClient = this.mqttClientHashMap.get(webSocketSession.getId());
        if (Objects.isNull(mqttClient)) {
            throw new RuntimeException("mqtt client is null");
        }
        return mqttClient;
    }

    protected final Subscription getRedisSubscription(@NotNull WebSocketSession webSocketSession) {
        RedisConnection redisConnection = this.redisConnectionHashMap.get(webSocketSession.getId());
        if (Objects.isNull(redisConnection)) {
            throw new RuntimeException("redisConnection is null");
        }
        Subscription subscription = redisConnection.getSubscription();
        if (Objects.isNull(subscription)) {
            throw new RuntimeException("subscription is null");
        }
        return subscription;
    }
}
