package com.mastfrog.acteur.pubsub;

import com.mastfrog.acteur.pubsub.BusListener;
import com.mastfrog.marshallers.netty.NettyContentMarshallers;
import com.mastfrog.shutdown.hooks.ShutdownHookRegistry;
import com.mastfrog.util.preconditions.Checks;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;

@Singleton
/* loaded from: input_file:com/mastfrog/acteur/pubsub/Bus.class */
class Bus implements PubSubBus {
    private final ChannelRegistry<ChannelId> reg;
    private final NettyContentMarshallers marshallers;
    private final ByteBufAllocator alloc;
    private final ExecutorService threadPool;
    private final BusListener.Registry listeners;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/mastfrog/acteur/pubsub/Bus$CHF.class */
    public static class CHF implements ChannelFutureListener {
        private final Iterator<Channel> channels;
        private final WebSocketFrame frame;
        private final ChannelPromise prom;

        CHF(Iterator<Channel> it, WebSocketFrame webSocketFrame, ChannelPromise channelPromise) {
            this.channels = it;
            this.frame = webSocketFrame;
            this.prom = channelPromise;
        }

        public void operationComplete(ChannelFuture channelFuture) {
            if (channelFuture != null && !channelFuture.isSuccess()) {
                this.prom.setFailure(channelFuture.cause());
                return;
            }
            if (!this.channels.hasNext() && !this.prom.isDone()) {
                this.prom.setSuccess();
                return;
            }
            Channel channel = null;
            while (this.channels.hasNext()) {
                channel = this.channels.next();
                if (channel.isWritable()) {
                    break;
                }
            }
            if (channel != null) {
                ChannelFuture addListener = channel.writeAndFlush(this.frame.retainedDuplicate()).addListener(this);
                if (this.channels.hasNext()) {
                    addListener.addListener(this);
                }
            }
            if (this.channels.hasNext() || this.prom.isDone()) {
                return;
            }
            this.prom.setSuccess();
        }
    }

    @Inject
    Bus(ShutdownHookRegistry shutdownHookRegistry, NettyContentMarshallers nettyContentMarshallers, ByteBufAllocator byteBufAllocator, @Named("bus") ExecutorService executorService, BusListener.Registry registry) {
        this.reg = new ChannelRegistry<>(shutdownHookRegistry);
        this.marshallers = nettyContentMarshallers;
        this.alloc = byteBufAllocator;
        this.threadPool = executorService;
        this.listeners = registry;
    }

    @Override // com.mastfrog.acteur.pubsub.PubSubBus
    public Future<Boolean> subscribe(Channel channel, ChannelId channelId) {
        this.listeners.onSubscribe(channelId, channel);
        channel.closeFuture().addListener(channelFuture -> {
            this.listeners.onUnsubscribe(channelId, channel);
        });
        return this.reg.register(channelId, channel);
    }

    @Override // com.mastfrog.acteur.pubsub.PubSubBus
    public Future<Boolean> unsubscribe(Channel channel, ChannelId channelId) {
        this.listeners.onUnsubscribe(channelId, channel);
        return this.reg.unsubscribe(channelId, channel);
    }

    public boolean hasSubscribers(ChannelId channelId) {
        return !this.reg.channels(channelId).isEmpty();
    }

    @Override // com.mastfrog.acteur.pubsub.PubSubBus
    public <T> ChannelPromise publish(T t, Channel channel, Set<ChannelId> set) throws Exception {
        Checks.notEmpty("to", set);
        ByteBuf buffer = this.alloc.buffer();
        this.marshallers.write(t, buffer, new Object[0]);
        BinaryWebSocketFrame binaryWebSocketFrame = new BinaryWebSocketFrame(buffer);
        HashSet hashSet = new HashSet(50);
        Iterator<ChannelId> it = set.iterator();
        while (it.hasNext()) {
            hashSet.addAll(this.reg.channels(it.next()));
        }
        hashSet.remove(channel);
        ChannelPromise newPromise = channel.newPromise();
        if (hashSet.isEmpty()) {
            newPromise.setSuccess();
        } else {
            this.threadPool.submit(() -> {
                new CHF(hashSet.iterator(), binaryWebSocketFrame, newPromise).operationComplete((ChannelFuture) null);
            });
        }
        this.listeners.onPublish(t, set, channel);
        return newPromise;
    }

    @Override // com.mastfrog.acteur.pubsub.PubSubBus
    public <T> ChannelPromise broadcast(T t) throws Exception {
        return broadcast(t, new EmbeddedChannel());
    }

    @Override // com.mastfrog.acteur.pubsub.PubSubBus
    public <T> ChannelPromise broadcast(T t, Channel channel) throws Exception {
        ChannelPromise newPromise = channel.newPromise();
        HashSet hashSet = new HashSet(this.reg.allChannels());
        hashSet.remove(channel);
        if (hashSet.isEmpty()) {
            return newPromise.setSuccess();
        }
        ByteBuf buffer = this.alloc.buffer();
        this.marshallers.write(t, buffer, new Object[0]);
        BinaryWebSocketFrame binaryWebSocketFrame = new BinaryWebSocketFrame(buffer);
        this.threadPool.submit(() -> {
            new CHF(hashSet.iterator(), binaryWebSocketFrame, newPromise).operationComplete((ChannelFuture) null);
        });
        return newPromise;
    }
}
