package onion.mqtt.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.concurrent.atomic.AtomicBoolean;
import onion.mqtt.server.manager.SessionManager;
import onion.mqtt.server.manager.SubscribeManager;
import onion.mqtt.server.store.SessionStore;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:onion/mqtt/server/MqttServer.class */
public class MqttServer {
    static final Logger log = LoggerFactory.getLogger(MqttServer.class);
    private Channel channel;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workGroup;
    private final MqttServerConfig config;
    private final MqttServerBuilder serverBuilder;

    public MqttServer(MqttServerBuilder mqttServerBuilder) {
        this.serverBuilder = mqttServerBuilder;
        this.config = mqttServerBuilder.getConfig();
    }

    public boolean publish(String str, byte[] bArr, MqttQoS mqttQoS, boolean z) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MqttPublishMessage build = MqttMessageBuilders.publish().topicName(str).qos(mqttQoS).retained(z).build();
        if (ObjectUtils.isNotEmpty(bArr)) {
            build.payload().writeBytes(bArr);
        }
        SubscribeManager.getInstance().searchSubscribe(str).forEach(subscribeStore -> {
            SessionStore session;
            if (!mqttQoS.equals(subscribeStore.getMqttQoS()) || (session = SessionManager.getInstance().getSession(subscribeStore.getClientId())) == null) {
                return;
            }
            session.getChannel().eventLoop().execute(() -> {
                this.channel.writeAndFlush(build).addListener(future -> {
                    if (future.isSuccess()) {
                        atomicBoolean.set(true);
                    }
                });
            });
        });
        return atomicBoolean.get();
    }

    public boolean publish(String str, String str2, byte[] bArr, MqttQoS mqttQoS, boolean z) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MqttPublishMessage build = MqttMessageBuilders.publish().topicName(str2).qos(mqttQoS).retained(z).build();
        if (ObjectUtils.isNotEmpty(bArr)) {
            build.payload().writeBytes(bArr);
        }
        SubscribeManager.getInstance().searchSubscribe(str2).forEach(subscribeStore -> {
            SessionStore session;
            if (str.equals(subscribeStore.getClientId()) && mqttQoS.equals(subscribeStore.getMqttQoS()) && (session = SessionManager.getInstance().getSession(str)) != null) {
                session.getChannel().eventLoop().execute(() -> {
                    this.channel.writeAndFlush(build).addListener(future -> {
                        if (future.isSuccess()) {
                            atomicBoolean.set(true);
                        }
                    });
                });
            }
        });
        return atomicBoolean.get();
    }

    public void start() {
        this.bossGroup = new NioEventLoopGroup(this.config.getBossThread());
        this.workGroup = new NioEventLoopGroup(this.config.getWorkThread());
        MqttServerChannelInitializer mqttServerChannelInitializer = new MqttServerChannelInitializer(this.serverBuilder.getConfig(), new MqttServerInboundHandler(this.serverBuilder));
        try {
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(this.bossGroup, this.workGroup).channel(NioServerSocketChannel.class).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childOption(ChannelOption.SO_REUSEADDR, true).childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.TCP_NODELAY, true).childHandler(mqttServerChannelInitializer);
                ChannelFuture sync = StringUtils.isNoneBlank(new CharSequence[]{this.config.getHost()}) ? serverBootstrap.bind(this.config.getHost(), this.config.getPort()).sync() : serverBootstrap.bind(this.config.getPort()).sync();
                if (sync != null) {
                    sync.addListener(channelFuture -> {
                        if (channelFuture.isSuccess()) {
                            log.info("=== mqtt server start success ===");
                        } else {
                            log.error("=== mqtt server start fail ===");
                            stop();
                        }
                    });
                    this.channel = sync.channel();
                    this.channel.closeFuture().sync();
                }
            } catch (Exception e) {
                log.error("=== mqtt server start error: {} ===", e.getMessage());
                stop();
            }
        } finally {
            stop();
        }
    }

    public void stop() {
        if (this.bossGroup != null) {
            this.bossGroup.shutdownGracefully();
            this.bossGroup = null;
        }
        if (this.workGroup != null) {
            this.workGroup.shutdownGracefully();
            this.workGroup = null;
        }
        if (this.channel != null) {
            this.channel.closeFuture().syncUninterruptibly();
            this.channel = null;
        }
        log.error("mqtt server stopped");
    }
}
