package cool.taomu.mqtt.broker.factory;

import cool.taomu.mqtt.broker.entity.MessageEntity;
import cool.taomu.mqtt.broker.utils.MqttUtils;
import cool.taomu.mqtt.broker.utils.impl.DataStorage;
import cool.taomu.storage.inter.IStorage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import org.eclipse.xtext.xbase.lib.CollectionLiterals;
import org.eclipse.xtext.xbase.lib.IterableExtensions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cool/taomu/mqtt/broker/factory/PubRecRequest.class */
public class PubRecRequest implements IProcess {
    private static final Logger LOG = LoggerFactory.getLogger(PubRecRequest.class);
    private IStorage cache = new DataStorage();

    @Override // cool.taomu.mqtt.broker.factory.IProcess
    public void request(ChannelHandlerContext channelHandlerContext, MqttMessage mqttMessage) {
        String clientId = MqttUtils.getClientId(channelHandlerContext.channel());
        LOG.info("执行了MQTT PubRec 命令 : " + clientId);
        int messageId = ((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId();
        Iterator it = ((HashSet) this.cache.get("mqtt-qos2-message", IterableExtensions.join(Collections.unmodifiableList(CollectionLiterals.newArrayList(new Object[]{clientId, Integer.valueOf(messageId)})), "#"))).iterator();
        while (it.hasNext()) {
            MessageEntity messageEntity = (MessageEntity) it.next();
            messageEntity.getSenderChannel().writeAndFlush(new MqttMessage(new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(messageId)));
            this.cache.remove("mqtt-qos2-message", IterableExtensions.join(Collections.unmodifiableList(CollectionLiterals.newArrayList(new Object[]{clientId, Integer.valueOf(messageId)})), "#"));
            messageEntity.setSenderChannel(channelHandlerContext.channel());
            String senderId = messageEntity.getSenderId();
            messageEntity.setSenderId(clientId);
            this.cache.put("mqtt-qos2-message", IterableExtensions.join(Collections.unmodifiableList(CollectionLiterals.newArrayList(new Object[]{senderId, Integer.valueOf(messageId)})), "#"), messageEntity);
        }
    }
}
