package onion.mqtt.server.processor;

import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.concurrent.CompletableFuture;
import onion.mqtt.server.MqttServerBuilder;
import onion.mqtt.server.event.IMqttServerMessageListener;
import onion.mqtt.server.manager.MessageManager;
import onion.mqtt.server.manager.SessionManager;
import onion.mqtt.server.manager.SubscribeManager;
import onion.mqtt.server.store.MessageStore;
import onion.mqtt.server.store.SessionStore;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:onion/mqtt/server/processor/PublishProcessor.class */
public class PublishProcessor extends AbstractMqttServerProcessor<MqttPublishMessage> {
    static final Logger log = LoggerFactory.getLogger(PublishProcessor.class);
    private final IMqttServerMessageListener messageEvent;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: onion.mqtt.server.processor.PublishProcessor$1, reason: invalid class name */
    /* loaded from: input_file:onion/mqtt/server/processor/PublishProcessor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS = new int[MqttQoS.values().length];

        static {
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.AT_MOST_ONCE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.AT_LEAST_ONCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.EXACTLY_ONCE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public PublishProcessor(MqttServerBuilder mqttServerBuilder) {
        this.messageEvent = mqttServerBuilder.getMessageListener();
    }

    @Override // onion.mqtt.server.processor.AbstractMqttServerProcessor
    public void process(Channel channel, MqttPublishMessage mqttPublishMessage) {
        if (ObjectUtils.isEmpty(mqttPublishMessage)) {
            return;
        }
        String clientId = getClientId(channel);
        MqttQoS qosLevel = mqttPublishMessage.fixedHeader().qosLevel();
        MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader();
        String str = variableHeader.topicName();
        int packetId = variableHeader.packetId();
        log.debug("Publish - clientId: {}, topic: {}, mqttQos: {}, packetId: {}", new Object[]{clientId, str, qosLevel, Integer.valueOf(packetId)});
        switch (AnonymousClass1.$SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[qosLevel.ordinal()]) {
            case 2:
                if (packetId != -1) {
                    MqttMessage build = MqttMessageBuilders.pubAck().packetId(packetId).build();
                    channel.eventLoop().execute(() -> {
                        channel.writeAndFlush(build);
                    });
                    log.debug("Publish - PubAck send clientId:{} topicName:{} mqttQoS:{} packetId:{}", new Object[]{clientId, str, qosLevel, Integer.valueOf(packetId)});
                    break;
                }
                break;
            case 3:
                if (packetId != -1) {
                    MqttMessage mqttMessage = new MqttMessage(new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(packetId));
                    channel.eventLoop().execute(() -> {
                        channel.writeAndFlush(mqttMessage);
                    });
                    log.debug("Publish - PubRec send clientId:{} topicName:{} mqttQoS:{} packetId:{}", new Object[]{clientId, str, qosLevel, Integer.valueOf(packetId)});
                    break;
                }
                break;
        }
        invokeListenerForPublish(channel, clientId, qosLevel, str, mqttPublishMessage);
    }

    private void invokeListenerForPublish(Channel channel, String str, MqttQoS mqttQoS, String str2, MqttPublishMessage mqttPublishMessage) {
        log.debug("publish topic: {}", str2);
        MqttFixedHeader fixedHeader = mqttPublishMessage.fixedHeader();
        boolean isRetain = fixedHeader.isRetain();
        if (isRetain) {
            byte[] bArr = new byte[mqttPublishMessage.payload().readableBytes()];
            if (MqttQoS.AT_MOST_ONCE == mqttQoS || bArr.length == 0) {
                MessageManager.getInstance().removeRetainMessage(str2);
            } else {
                MessageStore messageStore = new MessageStore();
                messageStore.setClientId(str);
                messageStore.setTopic(str2);
                messageStore.setQoS(fixedHeader.qosLevel().value());
                messageStore.setRetain(true);
                messageStore.setTimestamp(System.currentTimeMillis());
                messageStore.setPayload(bArr);
                MessageManager.getInstance().addRetainMessage(messageStore);
            }
        }
        SubscribeManager.getInstance().searchSubscribe(str2).forEach(subscribeStore -> {
            if (str.equals(subscribeStore.getClientId())) {
                return;
            }
            log.debug("publish topic: {}, clientId: {}", str2, subscribeStore.getClientId());
            SessionStore session = SessionManager.getInstance().getSession(subscribeStore.getClientId());
            if (session == null || session.getChannel() == null) {
                return;
            }
            MqttPublishMessage build = MqttMessageBuilders.publish().topicName(str2).qos(mqttQoS).payload(mqttPublishMessage.payload()).retained(isRetain).messageId(new Long(System.currentTimeMillis()).intValue()).build();
            session.getChannel().eventLoop().execute(() -> {
                session.getChannel().writeAndFlush(build);
            });
        });
        String str3 = new String(new byte[mqttPublishMessage.payload().readableBytes()]);
        CompletableFuture.runAsync(() -> {
            try {
                if (this.messageEvent != null) {
                    this.messageEvent.onMessage(channel, str, str2, mqttQoS, str3);
                }
            } catch (Throwable th) {
                log.error("publish publishEvent fail clientId: {}.", str);
            }
        });
    }
}
