package fs2.concurrent;

import cats.effect.kernel.GenConcurrent;
import cats.effect.kernel.Unique;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$NestedStreamOps$;
import fs2.concurrent.Broadcast;
import fs2.concurrent.PubSub;
import java.io.Serializable;
import scala.$less$colon$less$;
import scala.DummyImplicit$;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Some$;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Set;
import scala.runtime.BoxesRunTime;
import scala.runtime.ModuleSerializationProxy;
import scala.util.NotGiven$;

/* compiled from: Broadcast.scala */
/* loaded from: input_file:fs2/concurrent/Broadcast$.class */
public final class Broadcast$ implements Serializable {
    public static final Broadcast$State$ fs2$concurrent$Broadcast$$$State = null;
    public static final Broadcast$ MODULE$ = new Broadcast$();

    private Broadcast$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(Broadcast$.class);
    }

    public <F, O> Function1<Stream<F, O>, Stream<F, Stream<F, O>>> apply(int i, GenConcurrent<F, Throwable> genConcurrent) {
        return stream -> {
            return Stream$.MODULE$.eval(PubSub$.MODULE$.apply(PubSub$Strategy$.MODULE$.closeDrainFirst(strategy(i)), genConcurrent)).flatMap(pubSub -> {
                return Stream$.MODULE$.constant(subscriber$4(genConcurrent, pubSub), Stream$.MODULE$.constant$default$2()).concurrently(publish$2(genConcurrent, stream, pubSub), genConcurrent);
            }, NotGiven$.MODULE$.value());
        };
    }

    public <F, O, O2> Function1<Stream<F, O>, Stream<F, O2>> through(Seq<Function1<Stream<F, O>, Stream<F, O2>>> seq, GenConcurrent<F, Throwable> genConcurrent) {
        return stream -> {
            return Stream$NestedStreamOps$.MODULE$.parJoinUnbounded$extension(Stream$.MODULE$.NestedStreamOps(stream.through(apply(seq.size(), genConcurrent)).take(seq.size()).zipWithIndex().map(tuple2 -> {
                if (tuple2 != null) {
                    return ((Stream) tuple2._1()).through((Function1) seq.apply((int) BoxesRunTime.unboxToLong(tuple2._2())));
                }
                throw new MatchError(tuple2);
            })), genConcurrent);
        };
    }

    private <O> PubSub.Strategy<O, O, Broadcast.State<O>, Unique.Token> strategy(final int i) {
        return new PubSub.Strategy(i) { // from class: fs2.concurrent.Broadcast$$anon$1
            private final int minReady$1;

            {
                this.minReady$1 = i;
            }

            @Override // fs2.concurrent.PubSub.Strategy
            public /* bridge */ /* synthetic */ PubSub.Strategy transformSelector(Function2 function2) {
                PubSub.Strategy transformSelector;
                transformSelector = transformSelector(function2);
                return transformSelector;
            }

            @Override // fs2.concurrent.PubSub.Strategy
            public Broadcast.State initial() {
                return Broadcast$State$AwaitSub$.MODULE$.apply(Predef$.MODULE$.Set().empty());
            }

            @Override // fs2.concurrent.PubSub.Strategy
            public boolean accepts(Object obj, Broadcast.State state) {
                return state.isEmpty() && !state.awaitSub();
            }

            @Override // fs2.concurrent.PubSub.Strategy
            public Broadcast.State publish(Object obj, Broadcast.State state) {
                return Broadcast$State$Processing$.MODULE$.apply(state.subscribers(), state.subscribers(), state.subscribers(), obj);
            }

            @Override // fs2.concurrent.PubSub.Strategy
            public Tuple2 get(Unique.Token token, Broadcast.State state) {
                if (state instanceof Broadcast.State.AwaitSub) {
                    Set<Unique.Token> $plus = Broadcast$State$AwaitSub$.MODULE$.unapply((Broadcast.State.AwaitSub) state)._1().$plus(token);
                    return $plus.size() >= this.minReady$1 ? Tuple2$.MODULE$.apply(Broadcast$State$Empty$.MODULE$.apply($plus), None$.MODULE$) : Tuple2$.MODULE$.apply(Broadcast$State$AwaitSub$.MODULE$.apply($plus), None$.MODULE$);
                }
                if (state instanceof Broadcast.State.Empty) {
                    return Tuple2$.MODULE$.apply(Broadcast$State$Empty$.MODULE$.apply((Set) Broadcast$State$Empty$.MODULE$.unapply((Broadcast.State.Empty) state)._1().$plus(token)), None$.MODULE$);
                }
                if (!(state instanceof Broadcast.State.Processing)) {
                    throw new MatchError(state);
                }
                Broadcast.State.Processing unapply = Broadcast$State$Processing$.MODULE$.unapply((Broadcast.State.Processing) state);
                Set<Unique.Token> _1 = unapply._1();
                Set<Unique.Token> _2 = unapply._2();
                Set<Unique.Token> _3 = unapply._3();
                Object _4 = unapply._4();
                if (!_1.contains(token)) {
                    return Tuple2$.MODULE$.apply(Broadcast$State$Processing$.MODULE$.apply((Set) _1.$plus(token), _2, (Set) _3.$plus(token), _4), Some$.MODULE$.apply(_4));
                }
                if (_2.contains(token)) {
                    return Tuple2$.MODULE$.apply(Broadcast$State$Processing$.MODULE$.apply(_1, (Set) _2.$minus(token), _3, _4), Some$.MODULE$.apply(_4));
                }
                Set<Unique.Token> $minus = _3.$minus(token);
                return $minus.nonEmpty() ? Tuple2$.MODULE$.apply(Broadcast$State$Processing$.MODULE$.apply(_1, _2, $minus, _4), None$.MODULE$) : Tuple2$.MODULE$.apply(Broadcast$State$Empty$.MODULE$.apply(_1), None$.MODULE$);
            }

            @Override // fs2.concurrent.PubSub.Strategy
            public boolean empty(Broadcast.State state) {
                return state.isEmpty();
            }

            @Override // fs2.concurrent.PubSub.Strategy
            public Tuple2 subscribe(Unique.Token token, Broadcast.State state) {
                return Tuple2$.MODULE$.apply(state, BoxesRunTime.boxToBoolean(false));
            }

            @Override // fs2.concurrent.PubSub.Strategy
            public Broadcast.State unsubscribe(Unique.Token token, Broadcast.State state) {
                Broadcast.State apply;
                if (state instanceof Broadcast.State.AwaitSub) {
                    apply = Broadcast$State$AwaitSub$.MODULE$.apply((Set) Broadcast$State$AwaitSub$.MODULE$.unapply((Broadcast.State.AwaitSub) state)._1().$minus(token));
                } else if (state instanceof Broadcast.State.Empty) {
                    apply = Broadcast$State$Empty$.MODULE$.apply((Set) Broadcast$State$Empty$.MODULE$.unapply((Broadcast.State.Empty) state)._1().$minus(token));
                } else {
                    if (!(state instanceof Broadcast.State.Processing)) {
                        throw new MatchError(state);
                    }
                    Broadcast.State.Processing unapply = Broadcast$State$Processing$.MODULE$.unapply((Broadcast.State.Processing) state);
                    Set<Unique.Token> _1 = unapply._1();
                    Set<Unique.Token> _2 = unapply._2();
                    Set<Unique.Token> _3 = unapply._3();
                    Object _4 = unapply._4();
                    Set<Unique.Token> $minus = _3.$minus(token);
                    apply = $minus.nonEmpty() ? Broadcast$State$Processing$.MODULE$.apply((Set) _1.$minus(token), (Set) _2.$minus(token), $minus, _4) : Broadcast$State$Empty$.MODULE$.apply((Set) _1.$minus(token));
                }
                return apply;
            }
        };
    }

    private final Stream subscriber$4(GenConcurrent genConcurrent, PubSub pubSub) {
        return Stream$.MODULE$.bracket(cats.effect.kernel.package$.MODULE$.Concurrent().apply(genConcurrent, DummyImplicit$.MODULE$.dummyImplicit()).unique(), token -> {
            return pubSub.unsubscribe(token);
        }).flatMap(token2 -> {
            return pubSub.getStream(token2).unNoneTerminate($less$colon$less$.MODULE$.refl()).flatMap(chunk -> {
                return Stream$.MODULE$.chunk(chunk);
            }, NotGiven$.MODULE$.value());
        }, NotGiven$.MODULE$.value());
    }

    private final Stream publish$2(GenConcurrent genConcurrent, Stream stream, PubSub pubSub) {
        return stream.chunks().evalMap(chunk -> {
            return pubSub.publish(Some$.MODULE$.apply(chunk));
        }).onFinalize(pubSub.publish(None$.MODULE$), genConcurrent);
    }
}
