package cool.taomu.mqtt.broker.factory;

import cool.taomu.mqtt.broker.entity.MessageEntity;
import cool.taomu.mqtt.broker.impl.PublishObservable;
import cool.taomu.mqtt.broker.impl.Retain;
import cool.taomu.mqtt.broker.impl.RetainObservable;
import cool.taomu.mqtt.broker.utils.MqttUtils;
import cool.taomu.mqtt.broker.utils.impl.DataStorage;
import cool.taomu.mqtt.broker.utils.inter.IObservable;
import cool.taomu.mqtt.broker.utils.inter.IObserver;
import cool.taomu.storage.inter.IStorage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.ReferenceCountUtil;
import java.util.Collections;
import org.eclipse.xtext.xbase.lib.CollectionLiterals;
import org.eclipse.xtext.xbase.lib.Exceptions;
import org.eclipse.xtext.xbase.lib.IterableExtensions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cool/taomu/mqtt/broker/factory/PublishRequest.class */
public class PublishRequest implements IProcess {
    private IStorage cache = new DataStorage();
    private static final Logger LOG = LoggerFactory.getLogger(PublishRequest.class);
    private static final IObservable<IObserver> retainObservable = RetainObservable.getInstance();
    private static final IObservable<IObserver> observable = PublishObservable.getInstance();

    /* renamed from: cool.taomu.mqtt.broker.factory.PublishRequest$1, reason: invalid class name */
    /* loaded from: input_file:cool/taomu/mqtt/broker/factory/PublishRequest$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS = new int[MqttQoS.values().length];

        static {
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.EXACTLY_ONCE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.AT_MOST_ONCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.AT_LEAST_ONCE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @Override // cool.taomu.mqtt.broker.factory.IProcess
    public void request(ChannelHandlerContext channelHandlerContext, MqttMessage mqttMessage) {
        try {
            try {
                if (!(mqttMessage instanceof MqttPublishMessage)) {
                    ReferenceCountUtil.release(mqttMessage.payload());
                    return;
                }
                MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
                MessageEntity messageEntity = new MessageEntity();
                messageEntity.setSenderId(MqttUtils.getClientId(channelHandlerContext.channel()));
                LOG.info("执行了MQTT Publish 命令 : " + messageEntity.getSenderId());
                MqttQoS qosLevel = mqttPublishMessage.fixedHeader().qosLevel();
                messageEntity.setQos(qosLevel.ordinal());
                messageEntity.setTopic(mqttPublishMessage.variableHeader().topicName());
                messageEntity.setPayload(((MqttPublishMessage) mqttMessage).payload());
                messageEntity.setType(Integer.valueOf(mqttMessage.fixedHeader().messageType().value()));
                messageEntity.setDup(mqttPublishMessage.fixedHeader().isDup());
                messageEntity.setRetain(mqttPublishMessage.fixedHeader().isRetain());
                messageEntity.setMsgId(mqttPublishMessage.variableHeader().packetId());
                messageEntity.setSenderChannel(channelHandlerContext.channel());
                if (qosLevel != null) {
                    switch (AnonymousClass1.$SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[qosLevel.ordinal()]) {
                        case 1:
                        case 2:
                            LOG.info(String.format("Qos0 and Qos2 message,clientId=%s", messageEntity.getSenderId()));
                            retainMessage(messageEntity);
                            publishMessage(messageEntity);
                            break;
                        case 3:
                            LOG.info(String.format("Qos1 message,clientId=%s", messageEntity.getSenderId()));
                            retainMessage(messageEntity);
                            publishMessage(messageEntity);
                            channelHandlerContext.writeAndFlush(new MqttPubAckMessage(new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(messageEntity.getMsgId())));
                            break;
                        default:
                            LOG.info(String.format("Wrong mqtt message,clientId=%s", messageEntity.getSenderId()));
                            break;
                    }
                } else {
                    LOG.info(String.format("Wrong mqtt message,clientId=%s", messageEntity.getSenderId()));
                }
                ReferenceCountUtil.release(mqttMessage.payload());
            } catch (Throwable th) {
                if (!(th instanceof Exception)) {
                    throw Exceptions.sneakyThrow(th);
                }
                LOG.debug("执行了MQTT Publish 命令出错了 : ", (Exception) th);
                ReferenceCountUtil.release(mqttMessage.payload());
            }
        } catch (Throwable th2) {
            ReferenceCountUtil.release(mqttMessage.payload());
            throw th2;
        }
    }

    public void retainMessage(MessageEntity messageEntity) {
        LOG.debug("clientId 为 {} 是否存在 Retain 数据 {}, 接受到的数据为 {} ", new Object[]{messageEntity.getSenderId(), Boolean.valueOf(messageEntity.isRetain()), new String(messageEntity.getPayload())});
        this.cache.put("mqtt-message", messageEntity.getSenderId(), messageEntity);
        if (messageEntity.isRetain()) {
            int qos = messageEntity.getQos();
            byte[] payload = messageEntity.getPayload();
            if (qos != MqttQoS.AT_MOST_ONCE.ordinal() && payload != null && payload.length != 0) {
                LOG.info("保存 clientId 为 {} 的Retain数据", messageEntity.getSenderId());
                retainObservable.register(IterableExtensions.join(Collections.unmodifiableList(CollectionLiterals.newArrayList(new String[]{messageEntity.getSenderId(), messageEntity.getTopic()})), "-"), new Retain(messageEntity));
            } else {
                LOG.info("清空 clientId 为 {} 的Retain数据", messageEntity.getSenderId());
                retainObservable.unregister(IterableExtensions.join(Collections.unmodifiableList(CollectionLiterals.newArrayList(new String[]{messageEntity.getSenderId(), messageEntity.getTopic()})), "-"));
            }
        }
    }

    public void publishMessage(MessageEntity messageEntity) {
        LOG.debug("推送消息");
        observable.publish(messageEntity, new Object[0]);
    }
}
