package cool.taomu.mqtt.broker.impl;

import cool.taomu.mqtt.broker.entity.ClientSessionEntity;
import cool.taomu.mqtt.broker.entity.MessageEntity;
import cool.taomu.mqtt.broker.entity.PublishEntity;
import cool.taomu.mqtt.broker.entity.TopicEntity;
import cool.taomu.mqtt.broker.utils.MqttUtils;
import cool.taomu.mqtt.broker.utils.impl.DataStorage;
import cool.taomu.mqtt.broker.utils.inter.IObservable;
import cool.taomu.mqtt.broker.utils.inter.IObserver;
import cool.taomu.storage.inter.IStorage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.oro.text.perl.Perl5Util;
import org.eclipse.xtend.lib.annotations.Accessors;
import org.eclipse.xtext.xbase.lib.Exceptions;
import org.eclipse.xtext.xbase.lib.Pure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Accessors
/* loaded from: input_file:cool/taomu/mqtt/broker/impl/Retain.class */
public class Retain implements IObserver {
    private final Logger LOG;
    private IStorage cache;
    private MessageEntity msg;
    private static AtomicInteger count = new AtomicInteger(0);
    private int number;

    public Retain(MessageEntity messageEntity) {
        this.LOG = LoggerFactory.getLogger(Retain.class);
        this.cache = new DataStorage();
        this.number = 0;
        this.msg = messageEntity;
    }

    public Retain() {
        this.LOG = LoggerFactory.getLogger(Retain.class);
        this.cache = new DataStorage();
        this.number = 0;
        count.incrementAndGet();
        this.number = count.intValue();
    }

    private void publishMessage(TopicEntity topicEntity) {
        try {
            if (new Perl5Util().match("/" + topicEntity.getName().replace("/+", "/[a-zA-Z]?[a-zA-Z0-9]+").replace("/#", "/[a-zA-Z]?([a-zA-Z0-9/]*)").replace("/", "\\/") + "/", this.msg.getTopic())) {
                this.LOG.info("发送者id : {},  Topic : {}", this.msg.getSenderId(), this.msg.getTopic());
                int qos = MqttUtils.getQos(this.msg.getQos(), topicEntity.getQos());
                if (qos == 2) {
                    MessageEntity messageEntity = (MessageEntity) SerializationUtils.clone(this.msg);
                    messageEntity.setSenderChannel(this.msg.getSenderChannel());
                    this.cache.put("mqtt-qos2-message", topicEntity.getClientId(), messageEntity);
                }
                this.LOG.info("cache is null:{} client id:{}", Boolean.valueOf(this.cache == null), topicEntity.getClientId());
                ClientSessionEntity clientSessionEntity = (ClientSessionEntity) this.cache.get("mqtt-session", topicEntity.getClientId());
                this.LOG.info("订阅者id : {},  Topic : {}, 发送内容长度： {}", topicEntity.getClientId(), Integer.valueOf(this.msg.getPayload().length));
                clientSessionEntity.getCtx().writeAndFlush(response(new PublishEntity(MqttQoS.valueOf(qos), topicEntity.getName(), Integer.valueOf(clientSessionEntity.generateMessageId()), this.msg.getPayload(), false)));
            }
        } catch (Throwable th) {
            if (!(th instanceof Exception)) {
                throw Exceptions.sneakyThrow(th);
            }
            this.LOG.debug("publishMessage 方法出现错误 : ", (Exception) th);
        }
    }

    private MqttPublishMessage response(PublishEntity publishEntity) {
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, publishEntity.getDup().booleanValue(), publishEntity.getQos(), false, 0);
        MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(publishEntity.getTopicName(), publishEntity.getMessageId().intValue());
        ByteBuf byteBuf = null;
        if (publishEntity.getPayload() == null) {
            byteBuf = Unpooled.EMPTY_BUFFER;
        } else {
            try {
                byteBuf = Unpooled.wrappedBuffer(publishEntity.getPayload());
            } catch (Throwable th) {
                if (!(th instanceof IllegalArgumentException)) {
                    throw Exceptions.sneakyThrow(th);
                }
                ((IllegalArgumentException) th).printStackTrace();
            }
        }
        return new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, byteBuf);
    }

    @Override // cool.taomu.mqtt.broker.utils.inter.IObserver
    public void publish(IObservable<?> iObservable, Object obj) {
        if (obj instanceof TopicEntity) {
            publishMessage((TopicEntity) obj);
        }
    }

    @Pure
    public Logger getLOG() {
        return this.LOG;
    }

    @Pure
    public IStorage getCache() {
        return this.cache;
    }

    public void setCache(IStorage iStorage) {
        this.cache = iStorage;
    }

    @Pure
    public MessageEntity getMsg() {
        return this.msg;
    }

    public void setMsg(MessageEntity messageEntity) {
        this.msg = messageEntity;
    }

    @Pure
    public int getNumber() {
        return this.number;
    }

    public void setNumber(int i) {
        this.number = i;
    }
}
